Skip to content

Commit

Permalink
HDFS-8185. Separate client related routines in HAUtil into a new clas…
Browse files Browse the repository at this point in the history
…s. Contributed by Haohui Mai.
  • Loading branch information
Haohui Mai committed Apr 22, 2015
1 parent 674c7ef commit 6f8003d
Show file tree
Hide file tree
Showing 50 changed files with 448 additions and 401 deletions.
Expand Up @@ -17,11 +17,28 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;


import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;


public class DFSUtilClient { public class DFSUtilClient {
private static final Logger LOG = LoggerFactory.getLogger(
DFSUtilClient.class);
/** /**
* Converts a byte array to a string using UTF8 encoding. * Converts a byte array to a string using UTF8 encoding.
*/ */
Expand All @@ -44,6 +61,58 @@ public static String percent2String(double percentage) {
return StringUtils.format("%.2f%%", percentage); return StringUtils.format("%.2f%%", percentage);
} }


/**
* Returns collection of nameservice Ids from the configuration.
* @param conf configuration
* @return collection of nameservice Ids, or null if not specified
*/
public static Collection<String> getNameServiceIds(Configuration conf) {
return conf.getTrimmedStringCollection(DFS_NAMESERVICES);
}

/**
* Namenode HighAvailability related configuration.
* Returns collection of namenode Ids from the configuration. One logical id
* for each namenode in the in the HA setup.
*
* @param conf configuration
* @param nsId the nameservice ID to look at, or null for non-federated
* @return collection of namenode Ids
*/
public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
return conf.getTrimmedStringCollection(key);
}

/** Add non empty and non null suffix to a key */
static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.isEmpty()) {
return key;
}
assert !suffix.startsWith(".") :
"suffix '" + suffix + "' should not already have '.' prepended.";
return key + "." + suffix;
}

/**
* Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
* the configuration.
*
* @return list of InetSocketAddresses
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnWebHdfsAddresses(
Configuration conf, String scheme) {
if (WebHdfsConstants.WEBHDFS_SCHEME.equals(scheme)) {
return getAddresses(conf, null,
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
} else if (WebHdfsConstants.SWEBHDFS_SCHEME.equals(scheme)) {
return getAddresses(conf, null,
HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
} else {
throw new IllegalArgumentException("Unsupported scheme: " + scheme);
}
}

/** /**
* Decode a specific range of bytes of the given byte array to a string * Decode a specific range of bytes of the given byte array to a string
* using UTF8. * using UTF8.
Expand All @@ -62,4 +131,107 @@ private static String bytes2String(byte[] bytes, int offset, int length) {
return null; return null;
} }


/**
* @return <code>coll</code> if it is non-null and non-empty. Otherwise,
* returns a list with a single null value.
*/
static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
if (coll == null || coll.isEmpty()) {
return Collections.singletonList(null);
} else {
return coll;
}
}

/** Concatenate list of suffix strings '.' separated */
static String concatSuffixes(String... suffixes) {
if (suffixes == null) {
return null;
}
return Joiner.on(".").skipNulls().join(suffixes);
}

/**
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
* @param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
static Map<String, Map<String, InetSocketAddress>>
getAddresses(Configuration conf, String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
}

/**
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
* @param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
*
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
static Map<String, Map<String, InetSocketAddress>>
getAddressesForNsIds(
Configuration conf, Collection<String> nsIds, String defaultAddress,
String... keys) {
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : emptyAsSingletonNull(nsIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
}
return ret;
}

static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue, String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) {
LOG.warn("Namenode for " + nsId +
" remains unresolved for ID " + nnId +
". Check your hdfs-site.xml file to " +
"ensure namenodes are configured properly.");
}
ret.put(nnId, isa);
}
}
return ret;
}

/**
* Given a list of keys in the order of preference, returns a value
* for the key in the given order from the configuration.
* @param defaultValue default value to return, when key was not found
* @param keySuffix suffix to add to the key, if it is not null
* @param conf Configuration
* @param keys list of keys in the order of preference
* @return value of the key or default if a key was not found in configuration
*/
private static String getConfValue(String defaultValue, String keySuffix,
Configuration conf, String... keys) {
String value = null;
for (String key : keys) {
key = addSuffix(key, keySuffix);
value = conf.get(key);
if (value != null) {
break;
}
}
if (value == null) {
value = defaultValue;
}
return value;
}
} }
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;

import java.net.URI;

import static org.apache.hadoop.hdfs.protocol.HdfsConstantsClient.HA_DT_SERVICE_PREFIX;

@InterfaceAudience.Private
public class HAUtilClient {
/**
* @return true if the given nameNodeUri appears to be a logical URI.
*/
public static boolean isLogicalUri(
Configuration conf, URI nameNodeUri) {
String host = nameNodeUri.getHost();
// A logical name must be one of the service IDs.
return DFSUtilClient.getNameServiceIds(conf).contains(host);
}

/**
* Check whether the client has a failover proxy provider configured
* for the namenode/nameservice.
*
* @param conf Configuration
* @param nameNodeUri The URI of namenode
* @return true if failover is configured.
*/
public static boolean isClientFailoverConfigured(
Configuration conf, URI nameNodeUri) {
String host = nameNodeUri.getHost();
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + host;
return conf.get(configKey) != null;
}

/**
* Get the service name used in the delegation token for the given logical
* HA service.
* @param uri the logical URI of the cluster
* @param scheme the scheme of the corresponding FileSystem
* @return the service name
*/
public static Text buildTokenServiceForLogicalUri(final URI uri,
final String scheme) {
return new Text(buildTokenServicePrefixForLogicalUri(scheme)
+ uri.getHost());
}

public static String buildTokenServicePrefixForLogicalUri(String scheme) {
return HA_DT_SERVICE_PREFIX + scheme + ":";
}

/**
* Parse the file system URI out of the provided token.
*/
public static URI getServiceUriFromToken(final String scheme, Token<?> token) {
String tokStr = token.getService().toString();
final String prefix = buildTokenServicePrefixForLogicalUri(
scheme);
if (tokStr.startsWith(prefix)) {
tokStr = tokStr.replaceFirst(prefix, "");
}
return URI.create(scheme + "://" + tokStr);
}

/**
* @return true if this token corresponds to a logical nameservice
* rather than a specific namenode.
*/
public static boolean isTokenForLogicalUri(Token<?> token) {
return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
}
}
Expand Up @@ -31,6 +31,12 @@ public interface HdfsClientConfigKeys {
"^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$"; "^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";


static final String PREFIX = "dfs.client."; static final String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices";
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";


/** dfs.client.retry configuration properties */ /** dfs.client.retry configuration properties */
interface Retry { interface Retry {
Expand Down
Expand Up @@ -32,4 +32,10 @@ public interface HdfsConstantsClient {
*/ */
long GRANDFATHER_INODE_ID = 0; long GRANDFATHER_INODE_ID = 0;
byte BLOCK_STORAGE_POLICY_ID_UNSPECIFIED = 0; byte BLOCK_STORAGE_POLICY_ID_UNSPECIFIED = 0;
/**
* A prefix put before the namenode URI inside the "service" field
* of a delgation token, indicating that the URI is a logical (HA)
* URI.
*/
String HA_DT_SERVICE_PREFIX = "ha-";
} }
Expand Up @@ -23,7 +23,8 @@


@InterfaceAudience.Private @InterfaceAudience.Private
public class WebHdfsConstants { public class WebHdfsConstants {
/** Delegation token kind */ public static final String WEBHDFS_SCHEME = "webhdfs";
public static final String SWEBHDFS_SCHEME = "swebhdfs";
public static final Text WEBHDFS_TOKEN_KIND = new Text("WEBHDFS delegation"); public static final Text WEBHDFS_TOKEN_KIND = new Text("WEBHDFS delegation");
public static final Text SWEBHDFS_TOKEN_KIND = new Text("SWEBHDFS delegation"); public static final Text SWEBHDFS_TOKEN_KIND = new Text("SWEBHDFS delegation");


Expand Down
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -455,6 +455,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8133. Improve readability of deleted block check (Daryn Sharp via HDFS-8133. Improve readability of deleted block check (Daryn Sharp via
Colin P. McCabe) Colin P. McCabe)


HDFS-8185. Separate client related routines in HAUtil into a new class.
(wheat9)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -806,10 +806,10 @@ public void cancel(Token<?> token, Configuration conf) throws IOException {
private static ClientProtocol getNNProxy( private static ClientProtocol getNNProxy(
Token<DelegationTokenIdentifier> token, Configuration conf) Token<DelegationTokenIdentifier> token, Configuration conf)
throws IOException { throws IOException {
URI uri = HAUtil.getServiceUriFromToken(HdfsConstants.HDFS_URI_SCHEME, URI uri = HAUtilClient.getServiceUriFromToken(
token); HdfsConstants.HDFS_URI_SCHEME, token);
if (HAUtil.isTokenForLogicalUri(token) && if (HAUtilClient.isTokenForLogicalUri(token) &&
!HAUtil.isLogicalUri(conf, uri)) { !HAUtilClient.isLogicalUri(conf, uri)) {
// If the token is for a logical nameservice, but the configuration // If the token is for a logical nameservice, but the configuration
// we have disagrees about that, we can't actually renew it. // we have disagrees about that, we can't actually renew it.
// This can be the case in MR, for example, if the RM doesn't // This can be the case in MR, for example, if the RM doesn't
Expand Down
Expand Up @@ -102,8 +102,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.path.based.cache.block.map.allocation.percent"; "dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f; public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;


public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT =
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host"; public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host";
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
Expand Down Expand Up @@ -302,8 +304,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {


//Following keys have no defaults //Following keys have no defaults
public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir"; public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
public static final int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470; public static final int DFS_NAMENODE_HTTPS_PORT_DEFAULT =
public static final String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
public static final String DFS_NAMENODE_HTTPS_ADDRESS_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
public static final String DFS_NAMENODE_HTTPS_BIND_HOST_KEY = "dfs.namenode.https-bind-host"; public static final String DFS_NAMENODE_HTTPS_BIND_HOST_KEY = "dfs.namenode.https-bind-host";
public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT;
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir"; public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
Expand Down Expand Up @@ -485,7 +489,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold"; public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10; public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
public static final String DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY = "dfs.namenode.legacy-oiv-image.dir"; public static final String DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY = "dfs.namenode.legacy-oiv-image.dir";

public static final String DFS_NAMESERVICES = "dfs.nameservices"; public static final String DFS_NAMESERVICES = "dfs.nameservices";
public static final String DFS_NAMESERVICE_ID = "dfs.nameservice.id"; public static final String DFS_NAMESERVICE_ID = "dfs.nameservice.id";
public static final String DFS_INTERNAL_NAMESERVICES_KEY = "dfs.internal.nameservices"; public static final String DFS_INTERNAL_NAMESERVICES_KEY = "dfs.internal.nameservices";
Expand Down Expand Up @@ -513,7 +517,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT = true; public static final boolean DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT = true;


// HA related configuration // HA related configuration
public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; public static final String DFS_HA_NAMENODES_KEY_PREFIX =
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id"; public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";
public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints"; public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints";
public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true; public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true;
Expand Down

0 comments on commit 6f8003d

Please sign in to comment.