Skip to content

Commit

Permalink
HBASE-18115 Move SaslServer creation to HBaseSaslRpcServer
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed May 27, 2017
1 parent 97484f2 commit efc7edc
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 108 deletions.
Expand Up @@ -24,6 +24,7 @@
import javax.security.sasl.Sasl; import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient; import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException; import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;


import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -97,7 +98,7 @@ public static QualityOfProtection getQop(String stringQop) {
* @param rpcProtection Value of 'hbase.rpc.protection' configuration. * @param rpcProtection Value of 'hbase.rpc.protection' configuration.
* @return Map with values for SASL properties. * @return Map with values for SASL properties.
*/ */
static Map<String, String> initSaslProperties(String rpcProtection) { public static Map<String, String> initSaslProperties(String rpcProtection) {
String saslQop; String saslQop;
if (rpcProtection.isEmpty()) { if (rpcProtection.isEmpty()) {
saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop(); saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
Expand All @@ -123,4 +124,12 @@ static void safeDispose(SaslClient saslClient) {
LOG.error("Error disposing of SASL client", e); LOG.error("Error disposing of SASL client", e);
} }
} }

static void safeDispose(SaslServer saslServer) {
try {
saslServer.dispose();
} catch (SaslException e) {
LOG.error("Error disposing of SASL server", e);
}
}
} }
Expand Up @@ -30,8 +30,10 @@
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;


Expand All @@ -55,7 +57,8 @@
import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
Expand Down Expand Up @@ -112,6 +115,7 @@ public abstract class RpcServer implements RpcServerInterface,
protected static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." protected static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
+ Server.class.getName()); + Server.class.getName());
protected SecretManager<TokenIdentifier> secretManager; protected SecretManager<TokenIdentifier> secretManager;
protected final Map<String, String> saslProps;
protected ServiceAuthorizationManager authManager; protected ServiceAuthorizationManager authManager;


/** This is set to Call object before Handler invokes an RPC and ybdie /** This is set to Call object before Handler invokes an RPC and ybdie
Expand Down Expand Up @@ -307,7 +311,10 @@ public RpcServer(final Server server, final String name,
this.userProvider = UserProvider.instantiate(conf); this.userProvider = UserProvider.instantiate(conf);
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
if (isSecurityEnabled) { if (isSecurityEnabled) {
HBaseSaslRpcServer.init(conf); saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
} else {
saslProps = Collections.emptyMap();
} }


this.scheduler = scheduler; this.scheduler = scheduler;
Expand Down
Expand Up @@ -29,18 +29,14 @@
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.Properties; import java.util.Properties;


import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;

import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom; import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
Expand All @@ -51,8 +47,6 @@
import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -89,6 +83,7 @@
@edu.umd.cs.findbugs.annotations.SuppressWarnings( @edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT", value="VO_VOLATILE_INCREMENT",
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
@InterfaceAudience.Private
abstract class ServerRpcConnection implements Closeable { abstract class ServerRpcConnection implements Closeable {
/** */ /** */
protected final RpcServer rpcServer; protected final RpcServer rpcServer;
Expand Down Expand Up @@ -121,7 +116,7 @@ abstract class ServerRpcConnection implements Closeable {
// When is this set? FindBugs wants to know! Says NP // When is this set? FindBugs wants to know! Says NP
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
protected boolean useSasl; protected boolean useSasl;
protected SaslServer saslServer; protected HBaseSaslRpcServer saslServer;
protected CryptoAES cryptoAES; protected CryptoAES cryptoAES;
protected boolean useWrap = false; protected boolean useWrap = false;
protected boolean useCryptoAesWrap = false; protected boolean useCryptoAesWrap = false;
Expand All @@ -131,7 +126,6 @@ abstract class ServerRpcConnection implements Closeable {


protected boolean retryImmediatelySupported = false; protected boolean retryImmediatelySupported = false;


private UserGroupInformation attemptingUser = null; // user name before auth
protected User user = null; protected User user = null;
protected UserGroupInformation ugi = null; protected UserGroupInformation ugi = null;


Expand Down Expand Up @@ -164,13 +158,13 @@ public VersionInfo getVersionInfo() {
return null; return null;
} }


protected String getFatalConnectionString(final int version, final byte authByte) { private String getFatalConnectionString(final int version, final byte authByte) {
return "serverVersion=" + RpcServer.CURRENT_VERSION + return "serverVersion=" + RpcServer.CURRENT_VERSION +
", clientVersion=" + version + ", authMethod=" + authByte + ", clientVersion=" + version + ", authMethod=" + authByte +
", authSupported=" + (authMethod != null) + " from " + toString(); ", authSupported=" + (authMethod != null) + " from " + toString();
} }


protected UserGroupInformation getAuthorizedUgi(String authorizedId) private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException { throws IOException {
UserGroupInformation authorizedUgi; UserGroupInformation authorizedUgi;
if (authMethod == AuthMethod.DIGEST) { if (authMethod == AuthMethod.DIGEST) {
Expand All @@ -193,7 +187,7 @@ protected UserGroupInformation getAuthorizedUgi(String authorizedId)
* Set up cell block codecs * Set up cell block codecs
* @throws FatalConnectionException * @throws FatalConnectionException
*/ */
protected void setupCellBlockCodecs(final ConnectionHeader header) private void setupCellBlockCodecs(final ConnectionHeader header)
throws FatalConnectionException { throws FatalConnectionException {
// TODO: Plug in other supported decoders. // TODO: Plug in other supported decoders.
if (!header.hasCellBlockCodecClass()) return; if (!header.hasCellBlockCodecClass()) return;
Expand All @@ -218,13 +212,13 @@ protected void setupCellBlockCodecs(final ConnectionHeader header)
* *
* @throws FatalConnectionException * @throws FatalConnectionException
*/ */
protected void setupCryptoCipher(final ConnectionHeader header, private void setupCryptoCipher(final ConnectionHeader header,
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
throws FatalConnectionException { throws FatalConnectionException {
// If simple auth, return // If simple auth, return
if (saslServer == null) return; if (saslServer == null) return;
// check if rpc encryption with Crypto AES // check if rpc encryption with Crypto AES
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); String qop = saslServer.getNegotiatedQop();
boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
.getSaslQop().equalsIgnoreCase(qop); .getSaslQop().equalsIgnoreCase(qop);
boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean( boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean(
Expand Down Expand Up @@ -289,7 +283,7 @@ private ByteString getByteString(byte[] bytes) {
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
} }


protected UserGroupInformation createUser(ConnectionHeader head) { private UserGroupInformation createUser(ConnectionHeader head) {
UserGroupInformation ugi = null; UserGroupInformation ugi = null;


if (!head.hasUserInfo()) { if (!head.hasUserInfo()) {
Expand All @@ -316,14 +310,10 @@ protected UserGroupInformation createUser(ConnectionHeader head) {
return ugi; return ugi;
} }


protected void disposeSasl() { protected final void disposeSasl() {
if (saslServer != null) { if (saslServer != null) {
try { saslServer.dispose();
saslServer.dispose(); saslServer = null;
saslServer = null;
} catch (SaslException ignored) {
// Ignored. This is being disposed of anyway.
}
} }
} }


Expand Down Expand Up @@ -373,45 +363,11 @@ public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
byte[] replyToken; byte[] replyToken;
try { try {
if (saslServer == null) { if (saslServer == null) {
switch (authMethod) { saslServer =
case DIGEST: new HBaseSaslRpcServer(authMethod, rpcServer.saslProps, rpcServer.secretManager);
if (this.rpcServer.secretManager == null) {
throw new AccessDeniedException(
"Server is not configured to do DIGEST authentication.");
}
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
.getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
this.rpcServer.secretManager, ugi -> attemptingUser = ugi));
break;
default:
UserGroupInformation current = UserGroupInformation.getCurrentUser();
String fullName = current.getUserName();
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("Kerberos principal name is " + fullName);
}
final String names[] = SaslUtil.splitKerberosName(fullName);
if (names.length != 3) {
throw new AccessDeniedException(
"Kerberos principal name does NOT have the expected "
+ "hostname part: " + fullName);
}
current.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws SaslException {
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
.getMechanismName(), names[0], names[1],
HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
return null;
}
});
}
if (saslServer == null)
throw new AccessDeniedException(
"Unable to find SASL server implementation for "
+ authMethod.getMechanismName());
if (RpcServer.LOG.isDebugEnabled()) { if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName()); RpcServer.LOG
.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
} }
} }
if (RpcServer.LOG.isDebugEnabled()) { if (RpcServer.LOG.isDebugEnabled()) {
Expand All @@ -435,7 +391,8 @@ public Object run() throws SaslException {
this.rpcServer.metrics.authenticationFailure(); this.rpcServer.metrics.authenticationFailure();
String clientIP = this.toString(); String clientIP = this.toString();
// attempting user could be null // attempting user could be null
RpcServer.AUDITLOG.warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + attemptingUser); RpcServer.AUDITLOG
.warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + saslServer.getAttemptingUser());
throw e; throw e;
} }
if (replyToken != null) { if (replyToken != null) {
Expand All @@ -447,13 +404,12 @@ public Object run() throws SaslException {
null); null);
} }
if (saslServer.isComplete()) { if (saslServer.isComplete()) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); String qop = saslServer.getNegotiatedQop();
useWrap = qop != null && !"auth".equalsIgnoreCase(qop); useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
ugi = getAuthorizedUgi(saslServer.getAuthorizationID()); ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
if (RpcServer.LOG.isDebugEnabled()) { if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("SASL server context established. Authenticated client: " RpcServer.LOG.debug("SASL server context established. Authenticated client: " + ugi +
+ ugi + ". Negotiated QoP is " ". Negotiated QoP is " + qop);
+ saslServer.getNegotiatedProperty(Sasl.QOP));
} }
this.rpcServer.metrics.authenticationSuccess(); this.rpcServer.metrics.authenticationSuccess();
RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
Expand Down

0 comments on commit efc7edc

Please sign in to comment.