Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-3101. Depend on lightweight ConfigurationSource interface instead of Hadoop Configuration #834

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,23 @@

package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand All @@ -41,6 +54,8 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.util.Time;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
Expand All @@ -52,28 +67,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A Client for the storageContainer protocol for read object data.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline;
private final Configuration config;
private final ConfigurationSource config;
private Map<UUID, XceiverClientProtocolServiceStub> asyncStubs;
private XceiverClientMetrics metrics;
private Map<UUID, ManagedChannel> channels;
Expand All @@ -94,7 +94,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* @param config -- Ozone Config
* @param caCert - SCM ca certificate.
*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config,
public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
X509Certificate caCert) {
super();
Preconditions.checkNotNull(pipeline);
Expand All @@ -121,7 +121,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config,
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config) {
this(pipeline, config, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class XceiverClientManager implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);
//TODO : change this to SCM configuration class
private final Configuration conf;
private final ConfigurationSource conf;
private final Cache<String, XceiverClientSpi> clientCache;
private X509Certificate caCert;

Expand All @@ -83,12 +84,13 @@ public class XceiverClientManager implements Closeable {
*
* @param conf configuration
*/
public XceiverClientManager(Configuration conf) throws IOException {
public XceiverClientManager(ConfigurationSource conf) throws IOException {
this(conf, OzoneConfiguration.of(conf).getObject(ScmClientConfig.class),
null);
}

public XceiverClientManager(Configuration conf, ScmClientConfig clientConf,
public XceiverClientManager(ConfigurationSource conf,
ScmClientConfig clientConf,
String caCertPem) throws IOException {
Preconditions.checkNotNull(clientConf);
Preconditions.checkNotNull(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,25 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.ratis.RatisHelper;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
Expand All @@ -61,12 +66,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;

/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
* The underlying RPC mechanism can be chosen via the constructor.
Expand All @@ -77,13 +76,13 @@ public final class XceiverClientRatis extends XceiverClientSpi {

public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Configuration ozoneConf) {
ConfigurationSource ozoneConf) {
return newXceiverClientRatis(pipeline, ozoneConf, null);
}

public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Configuration ozoneConf, X509Certificate caCert) {
ConfigurationSource ozoneConf, X509Certificate caCert) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
Expand All @@ -100,7 +99,7 @@ public static XceiverClientRatis newXceiverClientRatis(
private final AtomicReference<RaftClient> client = new AtomicReference<>();
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;
private final Configuration ozoneConfiguration;
private final ConfigurationSource ozoneConfiguration;

// Map to track commit index at every server
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
Expand All @@ -112,7 +111,7 @@ public static XceiverClientRatis newXceiverClientRatis(
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
Configuration configuration) {
ConfigurationSource configuration) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -213,17 +213,16 @@ public static <T> void checkNotNull(T... references) {
* @param conf Configuration object
* @return list cache size
*/
public static int getListCacheSize(Configuration conf) {
public static int getListCacheSize(ConfigurationSource conf) {
return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE,
OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT);
}


/**
* Returns the maximum no of outstanding async requests to be handled by
* Standalone and Ratis client.
*/
public static int getMaxOutstandingRequests(Configuration config) {
public static int getMaxOutstandingRequests(ConfigurationSource config) {
return OzoneConfiguration.of(config)
.getObject(RatisClientConfig.class)
.getMaxOutstandingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import java.util.OptionalInt;
import java.util.TimeZone;

import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand Down Expand Up @@ -89,7 +89,8 @@ private HddsUtils() {
*
* @return Target {@code InetSocketAddress} for the SCM client endpoint.
*/
public static InetSocketAddress getScmAddressForClients(Configuration conf) {
public static InetSocketAddress getScmAddressForClients(
ConfigurationSource conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);

Expand All @@ -116,7 +117,7 @@ public static InetSocketAddress getScmAddressForClients(Configuration conf) {
* @throws IllegalArgumentException if configuration is not defined.
*/
public static InetSocketAddress getScmAddressForBlockClients(
Configuration conf) {
ConfigurationSource conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
Expand Down Expand Up @@ -147,7 +148,8 @@ public static InetSocketAddress getScmAddressForBlockClients(
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
public static Optional<String> getHostNameFromConfigKeys(Configuration conf,
public static Optional<String> getHostNameFromConfigKeys(
ConfigurationSource conf,
String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
Expand Down Expand Up @@ -206,7 +208,7 @@ public static OptionalInt getHostPort(String value) {
* or host:port format.
*/
public static OptionalInt getPortNumberFromConfigKeys(
Configuration conf, String... keys) {
ConfigurationSource conf, String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
final OptionalInt hostPort = getHostPort(value);
Expand All @@ -224,7 +226,7 @@ public static OptionalInt getPortNumberFromConfigKeys(
* @throws IllegalArgumentException If the configuration is invalid
*/
public static Collection<InetSocketAddress> getSCMAddresses(
Configuration conf) {
ConfigurationSource conf) {
Collection<String> names =
conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
if (names.isEmpty()) {
Expand Down Expand Up @@ -255,7 +257,7 @@ public static Collection<InetSocketAddress> getSCMAddresses(
* @throws IllegalArgumentException If the configuration is invalid
*/
public static InetSocketAddress getReconAddresses(
Configuration conf) {
ConfigurationSource conf) {
String name = conf.get(OZONE_RECON_ADDRESS_KEY);
if (StringUtils.isEmpty(name)) {
return null;
Expand All @@ -277,7 +279,8 @@ public static InetSocketAddress getReconAddresses(
* @throws IllegalArgumentException if {@code conf} has more than one SCM
* address or it has none
*/
public static InetSocketAddress getSingleSCMAddress(Configuration conf) {
public static InetSocketAddress getSingleSCMAddress(
ConfigurationSource conf) {
Collection<InetSocketAddress> singleton = getSCMAddresses(conf);
Preconditions.checkArgument(singleton.size() == 1,
MULTIPLE_SCM_NOT_YET_SUPPORTED);
Expand All @@ -295,7 +298,7 @@ public static InetSocketAddress getSingleSCMAddress(Configuration conf) {
* @throws UnknownHostException if the dfs.datanode.dns.interface
* option is used and the hostname can not be determined
*/
public static String getHostName(Configuration conf)
public static String getHostName(ConfigurationSource conf)
throws UnknownHostException {
String name = conf.get(DFS_DATANODE_HOST_NAME_KEY);
if (name == null) {
Expand Down Expand Up @@ -498,7 +501,7 @@ public static void validatePath(Path path, Path ancestor) {
* @param alias name of the credential to retreive
* @return String credential value or null
*/
static String getPassword(Configuration conf, String alias) {
static String getPassword(ConfigurationSource conf, String alias) {
String password = null;
try {
char[] passchars = conf.getPassword(alias);
Expand Down