diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java index 10416e22116b6..4ae0892bc3c4a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java @@ -95,4 +95,9 @@ public String[] getAllResolvedHostnameByDomainName( public void setAddressMap(Map addresses) { this.addrs = addresses; } + + @VisibleForTesting + public void setPtrMap(Map ptrMap) { + this.ptrMap = ptrMap; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index c85e211bd31c9..bb819814e724c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -19,11 +19,22 @@ package org.apache.hadoop.yarn.conf; import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.DomainNameResolver; +import org.apache.hadoop.net.DomainNameResolverFactory; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -39,8 +50,29 @@ public class HAUtil { public static final String BAD_CONFIG_MESSAGE_PREFIX = "Invalid configuration! "; + private final static List RM_ADDRESS_CONFIG_KEYS = Arrays.asList( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.RM_WEBAPP_ADDRESS, + YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ); + + private static DomainNameResolver dnr; + private static Collection originalRMIDs = null; + private HAUtil() { /* Hidden constructor */ } + public static DomainNameResolver getDnr() { + return dnr; + } + + public static void setDnrByConfiguration(Configuration conf) { + HAUtil.dnr = DomainNameResolverFactory.newInstance( + conf, YarnConfiguration.RESOLVE_RM_ADDRESS_KEY); + } + private static void throwBadConfigurationException(String msg) { throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg); } @@ -118,6 +150,12 @@ public static void verifyAndSetConfiguration(Configuration conf) * Then set the RM-ids. */ private static void verifyAndSetRMHAIdsList(Configuration conf) { + boolean resolveNeeded = conf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_DEFAULT); + if (resolveNeeded) { + getRMHAId(conf); + } Collection ids = conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS); if (ids.size() < 2) { @@ -227,6 +265,16 @@ public static Collection getRMHAIds(Configuration conf) { return conf.getStringCollection(YarnConfiguration.RM_HA_IDS); } + /** + * Instead of returning RM_HA_IDS in current configurations, it + * would return the originally preset one in case of DNS resolving. + * @param conf Configuration. + * @return RM Ids from original xml file + */ + public static Collection getOriginalRMHAIds(Configuration conf) { + return originalRMIDs == null ? getRMHAIds(conf) : originalRMIDs; + } + /** * @param conf Configuration. Please use verifyAndSetRMHAId to check. * @return RM Id on success @@ -235,19 +283,10 @@ public static String getRMHAId(Configuration conf) { int found = 0; String currentRMId = conf.getTrimmed(YarnConfiguration.RM_HA_ID); if(currentRMId == null) { - for(String rmId : getRMHAIds(conf)) { - String key = addSuffix(YarnConfiguration.RM_ADDRESS, rmId); - String addr = conf.get(key); - if (addr == null) { - continue; - } - InetSocketAddress s; - try { - s = NetUtils.createSocketAddr(addr); - } catch (Exception e) { - LOG.warn("Exception in creating socket address " + addr, e); - continue; - } + Map idAddressPairs = getResolvedRMIdPairs(conf); + for (Map.Entry entry : idAddressPairs.entrySet()) { + String rmId = entry.getKey(); + InetSocketAddress s = entry.getValue(); if (!s.isUnresolved() && NetUtils.isLocalAddress(s.getAddress())) { currentRMId = rmId.trim(); found++; @@ -262,6 +301,177 @@ public static String getRMHAId(Configuration conf) { return currentRMId; } + /** + * This function resolves all RMIds with their address. For multi-A DNS records, + * it will resolve all of them, and generate a new Id for each of them. + * + * @param conf Configuration + * @return Map key as RMId, value as its address + */ + public static Map getResolvedRMIdPairs( + Configuration conf) { + boolean resolveNeeded = conf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_DEFAULT); + boolean requireFQDN = conf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_TO_FQDN, + YarnConfiguration.RESOLVE_RM_ADDRESS_TO_FQDN_DEFAULT); + // In case client using DIFFERENT addresses for each service address + // need to categorize them first + Map, List> addressesConfigKeysMap = new HashMap<>(); + Collection rmIds = getOriginalRMHAIds(conf); + for (String configKey : RM_ADDRESS_CONFIG_KEYS) { + List addresses = new ArrayList<>(); + for (String rmId : rmIds) { + String keyToRead = addSuffix(configKey, rmId); + InetSocketAddress address = getInetSocketAddressFromString( + conf.get(keyToRead)); + if (address != null) { + addresses.add(address.getHostName()); + } + } + Collections.sort(addresses); + List configKeysOfTheseAddresses = addressesConfigKeysMap.get(addresses); + if (configKeysOfTheseAddresses == null) { + configKeysOfTheseAddresses = new ArrayList<>(); + addressesConfigKeysMap.put(addresses, configKeysOfTheseAddresses); + } + configKeysOfTheseAddresses.add(configKey); + } + // We need to resolve and override by group (categorized by their input host) + // But since the function is called from "getRMHAId", + // this function would only return value which is corresponded to YarnConfiguration.RM_ADDRESS + Map ret = null; + for (List configKeys : addressesConfigKeysMap.values()) { + Map res = getResolvedIdPairs( + conf, resolveNeeded, requireFQDN, getOriginalRMHAIds(conf), + configKeys.get(0), YarnConfiguration.RM_HA_IDS, configKeys); + if (configKeys.contains(YarnConfiguration.RM_ADDRESS)) { + ret = res; + } + } + return ret; + } + + private static Map getResolvedIdPairs( + Configuration conf, boolean resolveNeeded, boolean requireFQDN, Collection ids, + String configKey, String configKeyToReplace, List listOfConfigKeysToReplace) { + Map idAddressPairs = new HashMap<>(); + Map generatedIdToOriginalId = new HashMap<>(); + for (String id : ids) { + String key = addSuffix(configKey, id); + String addr = conf.get(key); // string with port + InetSocketAddress address = getInetSocketAddressFromString(addr); + if (address == null) { + continue; + } + if (resolveNeeded) { + if (dnr == null) { + setDnrByConfiguration(conf); + } + // If the address needs to be resolved, get all of the IP addresses + // from this address and pass them into the map + LOG.info("Multi-A domain name {} will be resolved by {}" + addr, dnr.getClass().getName()); + int port = address.getPort(); + String[] resolvedHostNames; + try { + resolvedHostNames = dnr.getAllResolvedHostnameByDomainName( + address.getHostName(), requireFQDN); + } catch (UnknownHostException e) { + LOG.warn("Exception in resolving socket address {}", address.getHostName(), e); + continue; + } + LOG.info("Resolved addresses for {} is {}", addr, Arrays.toString(resolvedHostNames)); + if (resolvedHostNames == null || resolvedHostNames.length < 1) { + LOG.warn("Cannot resolve from address {}", address.getHostName()); + } else { + // If multiple address resolved, corresponding id needs to be created + for (int i = 0; i < resolvedHostNames.length; i++) { + String generatedRMId = id + "_resolved_" + (i + 1); + idAddressPairs.put(generatedRMId, + new InetSocketAddress(resolvedHostNames[i], port)); + generatedIdToOriginalId.put(generatedRMId, id); + } + } + overrideIdsInConfiguration( + idAddressPairs, generatedIdToOriginalId, configKeyToReplace, + listOfConfigKeysToReplace, conf); + } else { + idAddressPairs.put(id, address); + } + } + return idAddressPairs; + } + + /** + * This function override all RMIds and their addresses by the input Map. + * + * @param idAddressPairs key as Id, value as its address + * @param generatedIdToOriginalId key as generated rmId from multi-A, + * value as its original input Id + * @param configKeyToReplace key to replace + * @param listOfConfigKeysToReplace list of keys to replace/add + * @param conf Configuration + */ + synchronized static void overrideIdsInConfiguration( + Map idAddressPairs, + Map generatedIdToOriginalId, + String configKeyToReplace, List listOfConfigKeysToReplace, + Configuration conf) { + Collection currentIds = getRMHAIds(conf); + Set resolvedIds = new HashSet<>(idAddressPairs.keySet()); + // override rm-ids + if (originalRMIDs == null) { + originalRMIDs = currentIds; + } + // if it is already resolved, we need to form a superset + resolvedIds.addAll((currentIds)); + resolvedIds.removeAll(generatedIdToOriginalId.values()); + conf.setStrings(configKeyToReplace, + resolvedIds.toArray(new String[0])); + // override/add address configuration entries for each rm-id + for (Map.Entry entry : idAddressPairs.entrySet()) { + String rmId = entry.getKey(); + String addr = entry.getValue().getHostName(); + String originalRMId = generatedIdToOriginalId.get(rmId); + if (originalRMId != null) { + // for each required configKeys, get its port and then set it back + for (String configKey : listOfConfigKeysToReplace) { + String keyToRead = addSuffix(configKey, originalRMId); + InetSocketAddress originalAddress = getInetSocketAddressFromString( + conf.get(keyToRead)); + if (originalAddress == null) { + LOG.warn("Missing configuration for key {}", keyToRead); + continue; + } + int port = originalAddress.getPort(); + String keyToWrite = addSuffix(configKey, rmId); + conf.setSocketAddr(keyToWrite, new InetSocketAddress(addr, port)); + } + } + } + } + + /** + * Helper function to create InetsocketAddress from string address. + * + * @param addr string format of address accepted by NetUtils.createSocketAddr + * @return InetSocketAddress of input, would return null upon any kinds of invalid inout + */ + public static InetSocketAddress getInetSocketAddressFromString(String addr) { + if (addr == null) { + return null; + } + InetSocketAddress address; + try { + address = NetUtils.createSocketAddr(addr); + } catch (Exception e) { + LOG.warn("Exception in creating socket address {}", addr, e); + return null; + } + return address; + } + @VisibleForTesting static String getNeedToSetValueMessage(String confKey) { return confKey + " needs to be set in an HA configuration."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 316a6421889bd..dcfca715d5f45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -895,6 +895,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; + /** YARN DNS resolving related configs. */ + public static final String RESOLVE_RM_ADDRESS_NEEDED_KEY = RM_HA_PREFIX + "resolve-needed"; + public static final boolean RESOLVE_RM_ADDRESS_NEEDED_DEFAULT = false; + public static final String RESOLVE_RM_ADDRESS_KEY = RM_HA_PREFIX + "resolver.impl"; + public static final String RESOLVE_RM_ADDRESS_TO_FQDN = RM_HA_PREFIX + "resolver.useFQDN"; + public static final boolean RESOLVE_RM_ADDRESS_TO_FQDN_DEFAULT = true; + public static final String RM_ID_REFRESH_INTERVAL = RM_HA_PREFIX + "refresh-period-ms"; + public static final long RM_ID_REFRESH_INTERVAL_DEFAULT = -1; + /** Store the related configuration files in File System */ public static final String FS_BASED_RM_CONF_STORE = RM_PREFIX + "configuration.file-system-based-store"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 35b1906698b7f..52354fff65568 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -148,7 +148,13 @@ public static Text getTokenService(Configuration conf, String address, // Build a list of service addresses to form the service name ArrayList services = new ArrayList(); YarnConfiguration yarnConf = new YarnConfiguration(conf); - for (String rmId : HAUtil.getRMHAIds(conf)) { + boolean resolveNeeded = yarnConf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_DEFAULT); + if (resolveNeeded) { + HAUtil.getResolvedRMIdPairs(yarnConf); + } + for (String rmId : HAUtil.getRMHAIds(yarnConf)) { // Set RM_ID to get the corresponding RM_ADDRESS yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); services.add(SecurityUtil.buildTokenService( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java index 89c8753868efc..90bce9c006258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java @@ -50,6 +50,9 @@ public class ConfiguredRMFailoverProxyProvider protected YarnConfiguration conf; protected String[] rmServiceIds; + private long rmIdsLastRefreshTime; + private long rmIdsRefreshInterval; + @Override public void init(Configuration configuration, RMProxy rmProxy, Class protocol) { @@ -57,9 +60,9 @@ public void init(Configuration configuration, RMProxy rmProxy, this.protocol = protocol; this.rmProxy.checkAllowedProtocols(this.protocol); this.conf = new YarnConfiguration(configuration); - Collection rmIds = HAUtil.getRMHAIds(conf); - this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]); - conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); + this.rmIdsRefreshInterval = conf.getLong(YarnConfiguration.RM_ID_REFRESH_INTERVAL, + YarnConfiguration.RM_ID_REFRESH_INTERVAL_DEFAULT); + refreshRMIds(true); conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, @@ -84,6 +87,9 @@ protected T getProxyInternal() { @Override public synchronized ProxyInfo getProxy() { + if (shouldRefreshAddress()) { + refreshRMIds(false); + } String rmId = rmServiceIds[currentProxyIndex]; T current = proxies.get(rmId); if (current == null) { @@ -119,4 +125,47 @@ public synchronized void close() throws IOException { } } } + + private boolean shouldRefreshAddress() { + long currentTime = System.currentTimeMillis(); + return rmIdsRefreshInterval > 0 && + currentTime > rmIdsLastRefreshTime + rmIdsRefreshInterval; + } + + private synchronized void refreshRMIds(boolean forceRefresh) throws IllegalStateException { + if (!forceRefresh && !shouldRefreshAddress()) { + return; + } + String currentRMId = HAUtil.getRMHAId(conf); + InetSocketAddress currentAddress = null; + if (currentRMId != null) { + // explicitly refresh it if not done + currentAddress = HAUtil.getInetSocketAddressFromString(conf.get(HAUtil.addSuffix( + YarnConfiguration.RM_ADDRESS, currentRMId))); + HAUtil.getResolvedRMIdPairs(conf); + } + rmIdsLastRefreshTime = System.currentTimeMillis(); + Collection rmIds = HAUtil.getRMHAIds(conf); + if (rmIds == null || rmIds.isEmpty()) { + String message = "no instances configured."; + LOG.error(message); + throw new IllegalStateException(message); + } + proxies.clear(); + this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]); + // After refresh, we should keep current rm pointing to the previous active one + if (currentRMId != null) { + for (int i = 0; i < rmServiceIds.length; i++) { + String confKey = HAUtil.addSuffix( + YarnConfiguration.RM_ADDRESS, rmServiceIds[i]); + InetSocketAddress thatAddress = + HAUtil.getInetSocketAddressFromString(conf.get(confKey)); + if (currentAddress.equals(thatAddress)) { + currentProxyIndex = i; + break; + } + } + } + conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 80672fb1cc8e4..56d61e779c6a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -668,6 +668,49 @@ false + + + Determines what class to use to resolve resourcemanager address to specific machine + address(es). + + yarn.resourcemanager.ha.resolver.impl + org.apache.hadoop.net.DNSDomainNameResolver + + + + + Determines if the given resourcemanager address is a domain name which needs to + be resolved (using the resolver configured by yarn.resourcemanager.ha.resolver.impl). + This adds a transparency layer in the client so physical server address + can change without changing the client. + + yarn.resourcemanager.ha.resolve-needed + false + + + + + The amount of milliseconds between DNS address re-resolving. + By default, this parameter is set to -1 which disabled the DNS auto-refresh functionality. + Enabling the auto-refresh would help long running sessions (including node managers) able to + find the replaced resource managers (e.g. during node replacement/migration) without downtime. + + yarn.resourcemanager.ha.refresh-period-ms + -1 + + + + + Determines whether the resolved result is fully qualified domain name instead + of pure IP address(es). + In secure environment, this has to be enabled since Kerberos is using fqdn + in machine's principal therefore accessing servers by IP won't be recognized + by the KDC. + + yarn.resourcemanager.ha.resolver.useFQDN + true + + Enable automatic failover. By default, it is enabled only when HA is enabled diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java new file mode 100644 index 0000000000000..ed307e4171335 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.net.MockDomainNameResolver; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +public class TestConfiguredRMFailoverProxyProvider { + private final static List HOST_LIST = Arrays.asList( + "host01", + "host02", + "host03", + "host04" + ); + private final static String SCHEDULER_HOST_ADDRESS = "host11"; + private final static List IP_LIST = Arrays.asList( + new byte[] {10, 0, 0, 1}, + new byte[] {10, 0, 0, 2}, + new byte[] {10, 0, 0, 3}, + new byte[] {10, 0, 0, 4} + ); + private final static String MULTI_A_RM_ID = "rm"; + private final static String MULTI_A_RM_ADDRESS = "rm.com"; + private final static String MULTI_A_RM_SCHEDULER_ADDRESS = "rmscheduler.com"; + private final static long REFRESH_TIME_INTERVAL = 1000; // unit ms + + public Configuration getDNSConfig() { + Configuration dnsConf = new Configuration(); + dnsConf.set(YarnConfiguration.RESOLVE_RM_ADDRESS_KEY, MockDomainNameResolver.class.getName()); + dnsConf.set(YarnConfiguration.RM_HA_IDS, MULTI_A_RM_ID); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, MULTI_A_RM_ID), + MULTI_A_RM_ADDRESS + ":" + YarnConfiguration.DEFAULT_RM_PORT); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, MULTI_A_RM_ID), + MULTI_A_RM_ADDRESS + ":" + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_SCHEDULER_ADDRESS, MULTI_A_RM_ID), + MULTI_A_RM_SCHEDULER_ADDRESS + ":" + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + dnsConf.setBoolean(YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, true); + dnsConf.setBoolean(YarnConfiguration.RESOLVE_RM_ADDRESS_TO_FQDN, true); + return dnsConf; + } + + /* + * Override the MockDomainNameResolver address mapping in HAUtil + * Source of mapping is from HOST_LIST and IP_LIST except for the exludeIdx one + */ + private void overrideDNSMapping(int excludeIdx) throws UnknownHostException { + // Mapping of domain names and IP addresses + Map addressMap = new HashMap<>(); + // Mapping from IP addresses to fqdns + Map ptrMap = new HashMap<>(); + int idx = 0; + InetAddress[] addresses = new InetAddress[HOST_LIST.size() - 1]; + InetAddress[] schedulerAddresses = new InetAddress[1]; + addressMap.put(MULTI_A_RM_ADDRESS, addresses); + addressMap.put(MULTI_A_RM_SCHEDULER_ADDRESS, schedulerAddresses); + // for addresses + for (int i = 0; i < HOST_LIST.size(); i++) { + if (i == excludeIdx) { + continue; + } + InetAddress address = InetAddress.getByAddress(IP_LIST.get(i)); + String host = HOST_LIST.get(i); + addresses[idx++] = address; + ptrMap.put(address, host); + } + // for schedulerAddress + InetAddress schedulerAddress = InetAddress.getByAddress(new byte[] {10, 1, 1, 1}); + schedulerAddresses[0] = schedulerAddress; + ptrMap.put(schedulerAddress, SCHEDULER_HOST_ADDRESS); + ((MockDomainNameResolver)HAUtil.getDnr()).setAddressMap(addressMap); + ((MockDomainNameResolver)HAUtil.getDnr()).setPtrMap(ptrMap); + } + + @SuppressWarnings("unchecked") // mock generics + private void initProxyProvider( + Configuration conf, + ConfiguredRMFailoverProxyProvider proxyProvider) { + RMProxy rm1Mock = mock(RMProxy.class); + doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); + proxyProvider.init(conf, rm1Mock, ApplicationClientProtocol.class); + } + + @Test(expected = IllegalStateException.class) + public void testInitWithNoInstances() { + Configuration conf = new Configuration(); + ConfiguredRMFailoverProxyProvider proxyProviderWithNoInstances = + new ConfiguredRMFailoverProxyProvider(); + initProxyProvider(conf, proxyProviderWithNoInstances); + } + + @Test + public void testGetProxy() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,rm4"); + ConfiguredRMFailoverProxyProvider proxyProvider = + new ConfiguredRMFailoverProxyProvider(); + initProxyProvider(conf, proxyProvider); + assertEquals(proxyProvider.rmServiceIds[0], "rm1"); + FailoverProxyProvider.ProxyInfo current = + proxyProvider.getProxy(); + assertEquals(current.proxyInfo, "rm1"); + } + + @Test + public void testInitWithMultiARecordRM() throws UnknownHostException { + Configuration conf = getDNSConfig(); + ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = + new ConfiguredRMFailoverProxyProvider(); + HAUtil.setDnrByConfiguration(conf); + overrideDNSMapping(1); + initProxyProvider(conf, proxyProviderWithmultiA); + assertEquals(HOST_LIST.size() - 1, proxyProviderWithmultiA.rmServiceIds.length); + for (String rmId : proxyProviderWithmultiA.rmServiceIds) { + assertTrue(rmId.startsWith("rm_resolved_")); + } + } + + @Test + public void testResolveDifferentMultiARecordRM() throws UnknownHostException { + Configuration conf = getDNSConfig(); + ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = + new ConfiguredRMFailoverProxyProvider(); + HAUtil.setDnrByConfiguration(conf); + overrideDNSMapping(1); + initProxyProvider(conf, proxyProviderWithmultiA); + List rmAddresses = getRMAddresses(proxyProviderWithmultiA.conf, + proxyProviderWithmultiA.rmServiceIds, + YarnConfiguration.RM_ADDRESS); + List rmAdminAddresses = getRMAddresses(proxyProviderWithmultiA.conf, + proxyProviderWithmultiA.rmServiceIds, + YarnConfiguration.RM_ADMIN_ADDRESS); + List rmSchedulerAddresses = getRMAddresses(proxyProviderWithmultiA.conf, + proxyProviderWithmultiA.rmServiceIds, + YarnConfiguration.RM_SCHEDULER_ADDRESS); + assertEquals(HOST_LIST.size() - 1, rmAddresses.size()); + assertEquals(HOST_LIST.size() - 1, rmAdminAddresses.size()); + assertEquals(1, rmSchedulerAddresses.size()); + assertEquals(SCHEDULER_HOST_ADDRESS + ":8030", rmSchedulerAddresses.get(0)); + } + + @Test + public void testRefreshWithMultiARecordRM() throws UnknownHostException, InterruptedException { + Configuration conf = getDNSConfig(); + conf.setLong(YarnConfiguration.RM_ID_REFRESH_INTERVAL, REFRESH_TIME_INTERVAL); + ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = + new ConfiguredRMFailoverProxyProvider(); + HAUtil.setDnrByConfiguration(conf); + overrideDNSMapping(0); + initProxyProvider(conf, proxyProviderWithmultiA); + InetSocketAddress oldActiveRMAddress = + conf.getSocketAddr(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, HAUtil.getRMHAId(conf)), + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + List oldRMAddresses = getRMAddresses( + conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); + for (String address : oldRMAddresses) { + assertTrue(!address.equals(HOST_LIST.get(0))); + } + // refresh + overrideDNSMapping(2); + Thread.sleep(REFRESH_TIME_INTERVAL * 2); + InetSocketAddress newActiveRMAddress = + conf.getSocketAddr(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, HAUtil.getRMHAId(conf)), + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + List newRMAddresses = getRMAddresses( + conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); + assertEquals(HOST_LIST.size() - 1, newRMAddresses.size()); + // active RM should remain same, even without failover + assertEquals(oldActiveRMAddress, newActiveRMAddress); + for (String address : newRMAddresses) { + assertTrue(!address.equals(HOST_LIST.get(2))); + } + } + + private List getRMAddresses(Configuration conf, String[] rmServiceIds, + String configKey) { + List addresses = new ArrayList<>(); + for (String rmId : rmServiceIds) { + String address = conf.get(HAUtil.addSuffix(configKey, rmId)); + if (address != null) { + addresses.add(address); + } + } + return addresses; + } +}