diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java index bbe25a9d840c..95d6064ffab9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java @@ -35,6 +35,16 @@ public SCMSecurityException(String message) { this.errorCode = ErrorCode.DEFAULT; } + /** + * Ctor. + * @param message - Error Message + * @param errorCode - Error code + */ + public SCMSecurityException(String message, ErrorCode errorCode) { + super(message); + this.errorCode = errorCode; + } + /** * Ctor. * @param message - Message. @@ -47,11 +57,23 @@ public SCMSecurityException(String message, Throwable cause) { /** * Ctor. - * @param message - Message. + * @param message - Error Message + * @param cause - Actual cause. + * @param errorCode - Error code. + */ + public SCMSecurityException(String message, Throwable cause, + ErrorCode errorCode) { + super(message, cause); + this.errorCode = errorCode; + } + + /** + * Ctor. + * @param cause - Actual cause. * @param error - error code. */ - public SCMSecurityException(String message, ErrorCode error) { - super(message); + public SCMSecurityException(Exception cause, ErrorCode error) { + super(cause); this.errorCode = error; } @@ -72,6 +94,17 @@ public ErrorCode getErrorCode() { * Error codes to make it easy to decode these exceptions. */ public enum ErrorCode { + OK, + INVALID_CSR, + UNABLE_TO_ISSUE_CERTIFICATE, + GET_DN_CERTIFICATE_FAILED, + GET_OM_CERTIFICATE_FAILED, + GET_SCM_CERTIFICATE_FAILED, + GET_CERTIFICATE_FAILED, + GET_CA_CERT_FAILED, + CERTIFICATE_NOT_FOUND, + PEM_ENCODE_FAILED, + INTERNAL_ERROR, DEFAULT, MISSING_BLOCK_TOKEN, BLOCK_TOKEN_VERIFICATION_FAILED diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java index 1abdcc30eb3c..53d8e9ac895e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java @@ -50,6 +50,7 @@ import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE; import static java.nio.file.attribute.PosixFilePermission.OWNER_READ; import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE; +import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.PEM_ENCODE_FAILED; /** * A class used to read and write X.509 certificates PEM encoded Streams. @@ -125,7 +126,7 @@ public static String getPEMEncodedString(X509Certificate certificate) LOG.error("Error in encoding certificate." + certificate .getSubjectDN().toString(), e); throw new SCMSecurityException("PEM Encoding failed for certificate." + - certificate.getSubjectDN().toString(), e); + certificate.getSubjectDN().toString(), e, PEM_ENCODE_FAILED); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java index 672b95e2fc0d..f54d228eb2a7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.function.Consumer; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; @@ -37,7 +38,10 @@ import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest.Builder; import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse; import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type; +import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -58,12 +62,22 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements */ private static final RpcController NULL_RPC_CONTROLLER = null; private final SCMSecurityProtocolPB rpcProxy; + private SCMSecurityProtocolFailoverProxyProvider failoverProxyProvider; public SCMSecurityProtocolClientSideTranslatorPB( SCMSecurityProtocolPB rpcProxy) { this.rpcProxy = rpcProxy; } + public SCMSecurityProtocolClientSideTranslatorPB( + SCMSecurityProtocolFailoverProxyProvider proxyProvider) { + Preconditions.checkState(proxyProvider != null); + this.failoverProxyProvider = proxyProvider; + this.rpcProxy = (SCMSecurityProtocolPB) RetryProxy.create( + SCMSecurityProtocolPB.class, failoverProxyProvider, + failoverProxyProvider.getRetryPolicy()); + } + /** * Helper method to wrap the request and send the message. */ @@ -80,12 +94,29 @@ private SCMSecurityResponse submitRequest( SCMSecurityRequest wrapper = builder.build(); response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper); + + handleError(response); + } catch (ServiceException ex) { throw ProtobufHelper.getRemoteException(ex); } return response; } + /** + * If response is not successful, throw exception. + * @param resp - SCMSecurityResponse + * @return if response is success, return response, else throw exception. + * @throws SCMSecurityException + */ + private SCMSecurityResponse handleError(SCMSecurityResponse resp) + throws SCMSecurityException { + if (resp.getStatus() != SCMSecurityProtocolProtos.Status.OK) { + throw new SCMSecurityException(resp.getMessage(), + SCMSecurityException.ErrorCode.values()[resp.getStatus().ordinal()]); + } + return resp; + } /** * Closes this stream and releases any system resources associated * with it. If the stream is already closed then invoking this diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java new file mode 100644 index 000000000000..a2d2fb35e4a4 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.proxy; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.ConfigurationException; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; +import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; +import org.apache.hadoop.hdds.utils.HAUtils; +import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY; + +/** + * Failover proxy provider for SCMSecurityProtocol server. + */ +public class SCMSecurityProtocolFailoverProxyProvider implements + FailoverProxyProvider, Closeable { + + public static final Logger LOG = + LoggerFactory.getLogger(SCMSecurityProtocolFailoverProxyProvider.class); + + // scmNodeId -> ProxyInfo + private final Map> scmProxies; + + // scmNodeId -> SCMProxyInfo + private final Map scmProxyInfoMap; + + private List scmNodeIds; + + private String currentProxySCMNodeId; + private int currentProxyIndex; + + private final ConfigurationSource conf; + private final SCMClientConfig scmClientConfig; + private final long scmVersion; + + private String scmServiceId; + + private final int maxRetryCount; + private final long retryInterval; + + private final UserGroupInformation ugi; + + /** + * Construct fail-over proxy provider for SCMSecurityProtocol Server. + * @param conf + * @param userGroupInformation + */ + public SCMSecurityProtocolFailoverProxyProvider(ConfigurationSource conf, + UserGroupInformation userGroupInformation) { + Preconditions.checkNotNull(userGroupInformation); + this.ugi = userGroupInformation; + this.conf = conf; + this.scmVersion = RPC.getProtocolVersion(SCMSecurityProtocolPB.class); + + this.scmProxies = new HashMap<>(); + this.scmProxyInfoMap = new HashMap<>(); + loadConfigs(); + + this.currentProxyIndex = 0; + currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); + scmClientConfig = conf.getObject(SCMClientConfig.class); + this.maxRetryCount = scmClientConfig.getRetryCount(); + this.retryInterval = scmClientConfig.getRetryInterval(); + } + + protected void loadConfigs() { + List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); + scmNodeIds = new ArrayList<>(); + + for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { + if (scmNodeInfo.getScmSecurityAddress() == null) { + throw new ConfigurationException("SCM Client Address could not " + + "be obtained from config. Config is not properly defined"); + } else { + InetSocketAddress scmSecurityAddress = + NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress()); + + scmServiceId = scmNodeInfo.getServiceId(); + String scmNodeId = scmNodeInfo.getNodeId(); + + scmNodeIds.add(scmNodeId); + SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, + scmSecurityAddress); + scmProxyInfoMap.put(scmNodeId, scmProxyInfo); + } + } + } + + @Override + public synchronized ProxyInfo getProxy() { + ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); + if (currentProxyInfo == null) { + currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); + } + return currentProxyInfo; + } + + /** + * Creates proxy object. + */ + private ProxyInfo createSCMProxy(String nodeId) { + ProxyInfo proxyInfo; + SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); + InetSocketAddress address = scmProxyInfo.getAddress(); + try { + SCMSecurityProtocolPB scmProxy = createSCMProxy(address); + // Create proxyInfo here, to make it work with all Hadoop versions. + proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); + scmProxies.put(nodeId, proxyInfo); + return proxyInfo; + } catch (IOException ioe) { + LOG.error("{} Failed to create RPC proxy to SCM at {}", + this.getClass().getSimpleName(), address, ioe); + throw new RuntimeException(ioe); + } + } + + private SCMSecurityProtocolPB createSCMProxy(InetSocketAddress scmAddress) + throws IOException { + Configuration hadoopConf = + LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); + RPC.setProtocolEngine(hadoopConf, SCMSecurityProtocolPB.class, + ProtobufRpcEngine.class); + + // FailoverOnNetworkException ensures that the IPC layer does not attempt + // retries on the same SCM in case of connection exception. This retry + // policy essentially results in TRY_ONCE_THEN_FAIL. + + RetryPolicy connectionRetryPolicy = RetryPolicies + .failoverOnNetworkException(0); + + return RPC.getProtocolProxy(SCMSecurityProtocolPB.class, + scmVersion, scmAddress, ugi, + hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), + (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); + } + + + @Override + public void performFailover(SCMSecurityProtocolPB currentProxy) { + if (LOG.isDebugEnabled()) { + int currentIndex = getCurrentProxyIndex(); + LOG.debug("Failing over SCM Security proxy to index: {}, nodeId: {}", + currentIndex, scmNodeIds.get(currentIndex)); + } + } + + /** + * Performs fail-over to the next proxy. + */ + public void performFailoverToNextProxy() { + int newProxyIndex = incrementProxyIndex(); + if (LOG.isDebugEnabled()) { + LOG.debug("Incrementing SCM Security proxy index to {}, nodeId: {}", + newProxyIndex, scmNodeIds.get(newProxyIndex)); + } + } + + /** + * Update the proxy index to the next proxy in the list. + * @return the new proxy index + */ + private synchronized int incrementProxyIndex() { + currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size(); + currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); + return currentProxyIndex; + } + + public RetryPolicy getRetryPolicy() { + // Client will attempt up to maxFailovers number of failovers between + // available SCMs before throwing exception. + RetryPolicy retryPolicy = new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception exception, int retries, + int failovers, boolean isIdempotentOrAtMostOnce) + throws Exception { + + if (LOG.isDebugEnabled()) { + if (exception.getCause() != null) { + LOG.debug("RetryProxy: SCM Security Server {}: {}: {}", + getCurrentProxySCMNodeId(), + exception.getCause().getClass().getSimpleName(), + exception.getCause().getMessage()); + } else { + LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), + exception.getMessage()); + } + } + + // For AccessControl Exception where Client is not authentica + if (HAUtils.isAccessControlException(exception)) { + return RetryAction.FAIL; + } + + // Perform fail over to next proxy, as right now we don't have any + // suggested leader ID from server, we fail over to next one. + // TODO: Act based on server response if leader id is passed. + performFailoverToNextProxy(); + return getRetryAction(FAILOVER_AND_RETRY, failovers); + } + + private RetryAction getRetryAction(RetryDecision fallbackAction, + int failovers) { + if (failovers < maxRetryCount) { + return new RetryAction(fallbackAction, getRetryInterval()); + } else { + return RetryAction.FAIL; + } + } + }; + + return retryPolicy; + } + + + @Override + public Class< SCMSecurityProtocolPB > getInterface() { + return SCMSecurityProtocolPB.class; + } + + @Override + public void close() throws IOException { + for (Map.Entry> proxy : + scmProxies.entrySet()) { + if (proxy.getValue() != null) { + RPC.stopProxy(proxy.getValue()); + } + scmProxies.remove(proxy.getKey()); + } + } + + public synchronized String getCurrentProxySCMNodeId() { + return currentProxySCMNodeId; + } + + public synchronized int getCurrentProxyIndex() { + return currentProxyIndex; + } + + private long getRetryInterval() { + return retryInterval; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java index 440bd4ca8211..2cd89935d888 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java @@ -63,6 +63,7 @@ import java.util.function.Consumer; import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getCertificationRequest; +import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.UNABLE_TO_ISSUE_CERTIFICATE; import static org.apache.hadoop.hdds.security.x509.exceptions.CertificateException.ErrorCode.CSR_ERROR; /** @@ -254,7 +255,8 @@ public Future requestCertificate( } } catch (CertificateException | IOException | OperatorCreationException e) { LOG.error("Unable to issue a certificate.", e); - xcertHolder.completeExceptionally(new SCMSecurityException(e)); + xcertHolder.completeExceptionally( + new SCMSecurityException(e, UNABLE_TO_ISSUE_CERTIFICATE)); } return xcertHolder; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java index b26ad2cbc29d..b8d2859eed3e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java @@ -59,6 +59,8 @@ import org.bouncycastle.util.io.pem.PemObject; import org.bouncycastle.util.io.pem.PemReader; +import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.INVALID_CSR; + /** * A certificate sign request object that wraps operations to build a * PKCS10CertificationRequest to CertificateServer. @@ -134,7 +136,8 @@ public static PKCS10CertificationRequest getCertificationRequest(String csr) try (PemReader reader = new PemReader(new StringReader(csr))) { PemObject pemObject = reader.readPemObject(); if(pemObject.getContent() == null) { - throw new SCMSecurityException("Invalid Certificate signing request"); + throw new SCMSecurityException("Invalid Certificate signing request", + INVALID_CSR); } return new PKCS10CertificationRequest(pemObject.getContent()); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java index 4632b36901a3..36d6bab0a1c2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils; import org.apache.hadoop.hdds.HddsConfigKeys; +import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.AddSCMRequest; import org.apache.hadoop.hdds.scm.ScmInfo; @@ -35,6 +36,8 @@ import org.apache.ratis.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.AccessControlException; import java.io.File; import java.io.IOException; @@ -285,4 +288,24 @@ public static File getMetaDir(DBDefinition definition, } return metadataDir; } + + /** + * Unwrap exception to check if it is some kind of access control problem. + * {@link AccessControlException} + */ + public static boolean isAccessControlException(Exception ex) { + if (ex instanceof ServiceException) { + Throwable t = ex.getCause(); + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + while (t != null) { + if (t instanceof AccessControlException) { + return true; + } + t = t.getCause(); + } + } + return false; + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java index 9e6ef22b9a94..ddc7e04b0652 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java @@ -46,16 +46,13 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; import org.apache.hadoop.hdds.recon.ReconConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider; import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.metrics2.MetricsException; @@ -435,20 +432,9 @@ public static String getDatanodeIdFilePath(ConfigurationSource conf) { */ public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient( OzoneConfiguration conf) throws IOException { - RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class, - ProtobufRpcEngine.class); - long scmVersion = - RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class); - InetSocketAddress address = - getScmAddressForSecurityProtocol(conf); - RetryPolicy retryPolicy = - RetryPolicies.retryForeverWithFixedSleep( - 1000, TimeUnit.MILLISECONDS); return new SCMSecurityProtocolClientSideTranslatorPB( - RPC.getProtocolProxy(SCMSecurityProtocolPB.class, scmVersion, - address, UserGroupInformation.getCurrentUser(), - conf, NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf), retryPolicy).getProxy()); + new SCMSecurityProtocolFailoverProxyProvider(conf, + UserGroupInformation.getCurrentUser())); } @@ -489,17 +475,11 @@ public static InetSocketAddress getScmAddressForSecurityProtocol( */ public static SCMSecurityProtocol getScmSecurityClient( OzoneConfiguration conf, UserGroupInformation ugi) throws IOException { - RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class, - ProtobufRpcEngine.class); - long scmVersion = - RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class); - InetSocketAddress scmSecurityProtoAdd = - getScmAddressForSecurityProtocol(conf); - return new SCMSecurityProtocolClientSideTranslatorPB( - RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion, - scmSecurityProtoAdd, ugi, conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); + SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient = + new SCMSecurityProtocolClientSideTranslatorPB( + new SCMSecurityProtocolFailoverProxyProvider(conf, ugi)); + return TracingUtil.createProxy(scmSecurityClient, + SCMSecurityProtocol.class, conf); } /** diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto index 04559524f9e0..48c6cf978b23 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto @@ -83,6 +83,19 @@ enum Type { enum Status { OK = 1; + INVALID_CSR = 2; + UNABLE_TO_ISSUE_CERTIFICATE = 3; + GET_DN_CERTIFICATE_FAILED = 4; + GET_OM_CERTIFICATE_FAILED = 5; + GET_SCM_CERTIFICATE_FAILED = 6; + GET_CERTIFICATE_FAILED = 7; + GET_CA_CERT_FAILED = 8; + CERTIFICATE_NOT_FOUND = 9; + PEM_ENCODE_FAILED = 10; + INTERNAL_ERROR = 11; + DEFAULT = 12; + MISSING_BLOCK_TOKEN = 13; + BLOCK_TOKEN_VERIFICATION_FAILED = 14; } /** * This message is send by data node to prove its identity and get an SCM diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java index babc87b36a39..06da6e42d0e2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse; import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Status; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; @@ -54,14 +56,17 @@ public class SCMSecurityProtocolServerSideTranslatorPB LoggerFactory.getLogger(SCMSecurityProtocolServerSideTranslatorPB.class); private final SCMSecurityProtocol impl; + private final StorageContainerManager scm; private OzoneProtocolMessageDispatcher dispatcher; public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl, + StorageContainerManager storageContainerManager, ProtocolMessageMetrics messageMetrics) { this.impl = impl; + this.scm = storageContainerManager; this.dispatcher = new OzoneProtocolMessageDispatcher<>("ScmSecurityProtocol", messageMetrics, LOG); @@ -70,62 +75,73 @@ public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl, @Override public SCMSecurityResponse submitRequest(RpcController controller, SCMSecurityRequest request) throws ServiceException { + if (!scm.checkLeader()) { + throw new ServiceException(scm.getScmHAManager() + .getRatisServer() + .triggerNotLeaderException()); + } return dispatcher.processRequest(request, this::processRequest, request.getCmdType(), request.getTraceID()); } - public SCMSecurityResponse processRequest(SCMSecurityRequest request) - throws ServiceException { + public SCMSecurityResponse processRequest(SCMSecurityRequest request) { + SCMSecurityResponse.Builder scmSecurityResponse = + SCMSecurityResponse.newBuilder().setCmdType(request.getCmdType()) + .setStatus(Status.OK); try { switch (request.getCmdType()) { case GetCertificate: - return SCMSecurityResponse.newBuilder() - .setCmdType(request.getCmdType()) - .setStatus(Status.OK) - .setGetCertResponseProto( - getCertificate(request.getGetCertificateRequest())) - .build(); + return scmSecurityResponse.setGetCertResponseProto( + getCertificate(request.getGetCertificateRequest())).build(); case GetCACertificate: - return SCMSecurityResponse.newBuilder() - .setCmdType(request.getCmdType()) - .setStatus(Status.OK) - .setGetCertResponseProto( - getCACertificate(request.getGetCACertificateRequest())) - .build(); + return scmSecurityResponse.setGetCertResponseProto( + getCACertificate(request.getGetCACertificateRequest())).build(); case GetOMCertificate: - return SCMSecurityResponse.newBuilder() - .setCmdType(request.getCmdType()) - .setStatus(Status.OK) - .setGetCertResponseProto( - getOMCertificate(request.getGetOMCertRequest())) + return scmSecurityResponse.setGetCertResponseProto( + getOMCertificate(request.getGetOMCertRequest())) .build(); case GetDataNodeCertificate: - return SCMSecurityResponse.newBuilder() - .setCmdType(request.getCmdType()) - .setStatus(Status.OK) - .setGetCertResponseProto( - getDataNodeCertificate(request.getGetDataNodeCertRequest())) + return scmSecurityResponse.setGetCertResponseProto( + getDataNodeCertificate(request.getGetDataNodeCertRequest())) .build(); case ListCertificate: - return SCMSecurityResponse.newBuilder() - .setCmdType(request.getCmdType()) - .setStatus(Status.OK) - .setListCertificateResponseProto( - listCertificate(request.getListCertificateRequest())) + return scmSecurityResponse.setListCertificateResponseProto( + listCertificate(request.getListCertificateRequest())) .build(); case GetSCMCertificate: - return SCMSecurityResponse.newBuilder() - .setCmdType(request.getCmdType()) - .setStatus(Status.OK) - .setGetCertResponseProto(getSCMCertificate( - request.getGetSCMCertificateRequest())) - .build(); + return scmSecurityResponse.setGetCertResponseProto(getSCMCertificate( + request.getGetSCMCertificateRequest())).build(); default: throw new IllegalArgumentException( "Unknown request type: " + request.getCmdType()); } } catch (IOException e) { - throw new ServiceException(e); + scmSecurityResponse.setSuccess(false); + scmSecurityResponse.setStatus(exceptionToResponseStatus(e)); + // If actual cause is set in SCMSecurityException, set message with + // actual cause message. + if (e.getMessage() != null) { + scmSecurityResponse.setMessage(e.getMessage()); + } else { + if (e.getCause() != null && e.getCause().getMessage() != null) { + scmSecurityResponse.setMessage(e.getCause().getMessage()); + } + } + return scmSecurityResponse.build(); + } + } + + /** + * Convert exception to corresponsing status. + * @param ex + * @return SCMSecurityProtocolProtos.Status code of the error. + */ + private Status exceptionToResponseStatus(IOException ex) { + if (ex instanceof SCMSecurityException) { + return Status.values()[ + ((SCMSecurityException) ex).getErrorCode().ordinal()]; + } else { + return Status.INTERNAL_ERROR; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java index 3f3b36014a1d..5df3aa7d9194 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java @@ -54,6 +54,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND; +import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CA_CERT_FAILED; +import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED; import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED; /** @@ -91,7 +94,8 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol { BlockingService secureProtoPbService = SCMSecurityProtocolProtos.SCMSecurityProtocolService .newReflectiveBlockingService( - new SCMSecurityProtocolServerSideTranslatorPB(this, metrics)); + new SCMSecurityProtocolServerSideTranslatorPB(this, + scm, metrics)); this.rpcServer = StorageContainerManager.startRpcServer( conf, @@ -181,14 +185,34 @@ private String getEncodedCertToString(String certSignReq, NodeType nodeType) return CertificateCodec.getPEMEncodedString(future.get()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException("generate" + nodeType.toString() + "Certificate " + - "operation failed. ", e); + throw generateException(e, nodeType); } catch (ExecutionException e) { - throw new IOException("generate" + nodeType.toString() + "Certificate " + - "operation failed.", e); + if (e.getCause() != null) { + if (e.getCause() instanceof SCMSecurityException) { + throw (SCMSecurityException) e.getCause(); + } else { + throw generateException(e, nodeType); + } + } else { + throw generateException(e, nodeType); + } } } + private SCMSecurityException generateException(Exception ex, NodeType role) { + SCMSecurityException.ErrorCode errorCode; + if (role == NodeType.SCM) { + errorCode = SCMSecurityException.ErrorCode.GET_SCM_CERTIFICATE_FAILED; + } else if (role == NodeType.OM) { + errorCode = SCMSecurityException.ErrorCode.GET_OM_CERTIFICATE_FAILED; + } else { + errorCode = SCMSecurityException.ErrorCode.GET_DN_CERTIFICATE_FAILED; + } + return new SCMSecurityException("generate " + role.toString() + + " Certificate operation failed", ex, errorCode); + + } + /** * Get SCM signed certificate with given serial id. * @@ -206,10 +230,12 @@ public String getCertificate(String certSerialId) throws IOException { return CertificateCodec.getPEMEncodedString(certificate); } } catch (CertificateException e) { - throw new IOException("getCertificate operation failed. ", e); + throw new SCMSecurityException("getCertificate operation failed. ", e, + GET_CERTIFICATE_FAILED); } LOGGER.debug("Certificate with serial id {} not found.", certSerialId); - throw new IOException("Certificate not found"); + throw new SCMSecurityException("Certificate not found", + CERTIFICATE_NOT_FOUND); } /** @@ -224,7 +250,8 @@ public String getCACertificate() throws IOException { return CertificateCodec.getPEMEncodedString( certificateServer.getCACertificate()); } catch (CertificateException e) { - throw new IOException("getRootCertificate operation failed. ", e); + throw new SCMSecurityException("getRootCertificate operation failed. ", + e, GET_CA_CERT_FAILED); } } @@ -249,7 +276,8 @@ public List listCertificate(NodeType role, String certStr = CertificateCodec.getPEMEncodedString(cert); results.add(certStr); } catch (SCMSecurityException e) { - throw new IOException("listCertificate operation failed. ", e); + throw new SCMSecurityException("listCertificate operation failed.", + e, e.getErrorCode()); } } return results; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java index 65c70b8fcc8f..f0adadd2284c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator; @@ -52,7 +53,6 @@ import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.minikdc.MiniKdc; @@ -302,7 +302,8 @@ public void testSCMSecurityProtocol() throws Exception { assertNotNull(scmSecurityProtocolClient); String caCert = scmSecurityProtocolClient.getCACertificate(); assertNotNull(caCert); - LambdaTestUtils.intercept(RemoteException.class, "Certificate not found", + LambdaTestUtils.intercept(SCMSecurityException.class, + "Certificate not found", () -> scmSecurityProtocolClient.getCertificate("1")); // Case 2: User without Kerberos credentials should fail.