Skip to content

Commit

Permalink
HDDS-597. Ratis: Support secure gRPC endpoint with mTLS for Ratis. Co…
Browse files Browse the repository at this point in the history
…ntributed by Ajay Kumar.
  • Loading branch information
Ajay Kumar authored and xiaoyuyao committed Jan 17, 2019
1 parent 140565f commit 01a7f9e
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 41 deletions.
Expand Up @@ -133,7 +133,7 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken));
if (SecurityConfig.isGrpcTlsEnabled(config)) {
if (secConfig.isGrpcTlsEnabled()) {
File trustCertCollectionFile = secConfig.getTrustStoreFile();
File privateKeyFile = secConfig.getClientPrivateKeyFile();
File clientCertChainFile = secConfig.getClientCertChainFile();
Expand Down
Expand Up @@ -21,6 +21,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
Expand Down Expand Up @@ -69,16 +71,19 @@ public static XceiverClientRatis newXceiverClientRatis(
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy);
retryPolicy, tlsConfig);
}

private final Pipeline pipeline;
private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>();
private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;

// Map to track commit index at every server
private final ConcurrentHashMap<String, Long> commitInfoMap;
Expand All @@ -90,14 +95,16 @@ public static XceiverClientRatis newXceiverClientRatis(
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks, RetryPolicy retryPolicy) {
int maxOutStandingChunks, RetryPolicy retryPolicy,
GrpcTlsConfig tlsConfig) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
this.maxOutstandingRequests = maxOutStandingChunks;
this.retryPolicy = retryPolicy;
commitInfoMap = new ConcurrentHashMap<>();
watchClient = null;
this.tlsConfig = tlsConfig;
}

private void updateCommitInfosMap(
Expand Down Expand Up @@ -145,7 +152,8 @@ public void connect() throws Exception {
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy))) {
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig))) {
throw new IllegalStateException("Client is already connected.");
}
}
Expand Down Expand Up @@ -211,7 +219,8 @@ public long watchForCommit(long index, long timeout)
// create a new RaftClient instance for watch request
if (watchClient == null) {
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
}
CompletableFuture<RaftClientReply> replyFuture = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
Expand All @@ -229,7 +238,8 @@ public long watchForCommit(long index, long timeout)
// here once the watch request bypassing sliding window in Raft Client
// gets fixed.
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
reply = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
Expand Down
Expand Up @@ -98,16 +98,17 @@ public class SecurityConfig {
private final String publicKeyFileName;
private final Duration certDuration;
private final String x509SignatureAlgo;
private final Boolean grpcBlockTokenEnabled;
private final boolean grpcBlockTokenEnabled;
private final String certificateDir;
private final String certificateFileName;
private final Boolean grpcTlsEnabled;
private Boolean grpcTlsUseTestCert;
private final boolean grpcTlsEnabled;
private boolean grpcTlsUseTestCert;
private String trustStoreFileName;
private String serverCertChainFileName;
private String clientCertChainFileName;
private final Duration defaultCertDuration;
private final boolean isSecurityEnabled;
private boolean grpcMutualTlsRequired;

/**
* Constructs a SecurityConfig.
Expand Down Expand Up @@ -152,7 +153,10 @@ public SecurityConfig(Configuration configuration) {

this.grpcTlsEnabled = this.configuration.getBoolean(HDDS_GRPC_TLS_ENABLED,
HDDS_GRPC_TLS_ENABLED_DEFAULT);

if (grpcTlsEnabled) {
this.grpcMutualTlsRequired = configuration.getBoolean(
HDDS_GRPC_MUTUAL_TLS_REQUIRED, HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT);

this.trustStoreFileName = this.configuration.get(
HDDS_TRUST_STORE_FILE_NAME, HDDS_TRUST_STORE_FILE_NAME_DEFAULT);
Expand Down Expand Up @@ -353,27 +357,24 @@ public Duration getMaxCertificateDuration() {
return this.certDuration;
}

public Boolean isGrpcBlockTokenEnabled() {
public boolean isGrpcBlockTokenEnabled() {
return this.grpcBlockTokenEnabled;
}

/**
* Returns true if TLS is enabled for gRPC services.
* @param conf configuration
* @return true if TLS is enabled for gRPC services.
*/
public static Boolean isGrpcTlsEnabled(Configuration conf) {
return conf.getBoolean(HDDS_GRPC_TLS_ENABLED,
HDDS_GRPC_TLS_ENABLED_DEFAULT);
public boolean isGrpcTlsEnabled() {
return this.grpcTlsEnabled;
}

/**
* Returns true if TLS mutual authentication is enabled for gRPC services.
* @return true if TLS is enabled for gRPC services.
*/
public Boolean isGrpcMutualTlsRequired() {
return configuration.getBoolean(HDDS_GRPC_MUTUAL_TLS_REQUIRED,
HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT);
public boolean isGrpcMutualTlsRequired() {
return this.grpcMutualTlsRequired;
}

/**
Expand Down
65 changes: 54 additions & 11 deletions hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
Expand Up @@ -21,18 +21,22 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.util.SizeInBytes;
Expand Down Expand Up @@ -128,38 +132,77 @@ static RaftGroup newRaftGroup(Pipeline pipeline) {
}

static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
RetryPolicy retryPolicy) throws IOException {
RetryPolicy retryPolicy, int maxOutStandingRequest,
GrpcTlsConfig tlsConfig) throws IOException {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
pipeline.getNodes()), retryPolicy);
pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig);
}

static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy) {
RetryPolicy retryPolicy, int maxOutstandingRequests,
GrpcTlsConfig tlsConfig) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy);
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
maxOutstandingRequests, tlsConfig);
}

static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RaftGroup group, RetryPolicy retryPolicy) {
return newRaftClient(rpcType, leader.getId(), group, retryPolicy);
RetryPolicy retryPolicy, int maxOutstandingRequests) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
maxOutstandingRequests, null);
}

static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
RaftGroup group, RetryPolicy retryPolicy) {
RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
GrpcTlsConfig tlsConfig) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);

GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));

return RaftClient.newBuilder()
GrpcConfigKeys.OutputStream.setOutstandingAppendsMax(properties,
maxOutStandingRequest);
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(group)
.setLeaderId(leader)
.setProperties(properties)
.setRetryPolicy(retryPolicy)
.build();
.setRetryPolicy(retryPolicy);

// TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later.
if (tlsConfig != null && rpcType == SupportedRpcType.GRPC) {
builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig));
}
return builder.build();
}

static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf) {
if (conf.isGrpcTlsEnabled()) {
if (conf.isGrpcMutualTlsRequired()) {
return new GrpcTlsConfig(
null, null, conf.getTrustStoreFile(), false);
} else {
return new GrpcTlsConfig(conf.getClientPrivateKeyFile(),
conf.getClientCertChainFile(), conf.getTrustStoreFile(), true);
}
}
return null;
}

static GrpcTlsConfig createTlsServerConfig(SecurityConfig conf) {
if (conf.isGrpcTlsEnabled()) {
if (conf.isGrpcMutualTlsRequired()) {
return new GrpcTlsConfig(
conf.getServerPrivateKeyFile(), conf.getServerCertChainFile(), null,
false);
} else {
return new GrpcTlsConfig(conf.getServerPrivateKeyFile(),
conf.getServerCertChainFile(), conf.getClientCertChainFile(), true);
}
}
return null;
}

static RetryPolicy createRetryPolicy(Configuration conf) {
Expand Down
Expand Up @@ -115,7 +115,7 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
nettyServerBuilder.addService(service);
}

if (SecurityConfig.isGrpcTlsEnabled(conf)) {
if (secConfig.isGrpcTlsEnabled()) {
File privateKeyFilePath = secConfig.getServerPrivateKeyFile();
File serverCertChainFilePath = secConfig.getServerCertChainFile();
File clientCertChainFilePath = secConfig.getClientCertChainFile();
Expand Down
Expand Up @@ -33,6 +33,7 @@
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
Expand All @@ -44,6 +45,8 @@
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.Message;
Expand All @@ -66,6 +69,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.HEAD;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -107,9 +111,9 @@ private static long nextCallId() {
private long nodeFailureTimeoutMs;
private final long cacheEntryExpiryInteval;


private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext context)
ContainerDispatcher dispatcher, Configuration conf, StateContext
context, GrpcTlsConfig tlsConfig)
throws IOException {
Objects.requireNonNull(dd, "id == null");
this.port = port;
Expand Down Expand Up @@ -139,12 +143,14 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
for (int i = 0; i < numContainerOpExecutors; i++) {
executors.add(Executors.newSingleThreadExecutor());
}

this.server = RaftServer.newBuilder()
RaftServer.Builder builder = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setProperties(serverProperties)
.setStateMachineRegistry(this::getStateMachine)
.build();
.setStateMachineRegistry(this::getStateMachine);
if (tlsConfig != null) {
builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig));
}
this.server = builder.build();
}

private ContainerStateMachine getStateMachine(RaftGroupId gid) {
Expand Down Expand Up @@ -405,10 +411,12 @@ public static XceiverServerRatis newXceiverServerRatis(
+ "fallback to use default port {}", localPort, e);
}
}
GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig(
new SecurityConfig(ozoneConf));
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
return new XceiverServerRatis(datanodeDetails, localPort,
dispatcher, ozoneConf, context);
dispatcher, ozoneConf, context, tlsConfig);
}

@Override
Expand Down
Expand Up @@ -242,8 +242,9 @@ private Container createContainer(final Configuration conf,
final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails);
final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
Collections.singleton(datanodeDetails));
final RaftClient client = RatisHelper.newRaftClient(
SupportedRpcType.GRPC, peer, retryPolicy);
final int maxOutstandingRequests = 100;
final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC,
peer, retryPolicy, maxOutstandingRequests, null);
Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess());
Thread.sleep(2000);
final ContainerID containerId = ContainerID.valueof(
Expand Down

0 comments on commit 01a7f9e

Please sign in to comment.