Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28377 Fallback to simple is broken for blocking rpc client #5690

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ public Boolean run() throws IOException {
// fall back to simple auth because server told us so.
// do not change authMethod and useSasl here, we should start from secure when
// reconnecting because regionserver may change its sasl config after restart.
saslRpcClient = null;
}
}
createStreams(inStream, outStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
Expand Down Expand Up @@ -107,12 +108,9 @@ public boolean saslConnect(InputStream inS, OutputStream outS) throws IOExceptio
int len = inStream.readInt();
if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
if (!fallbackAllowed) {
throw new IOException("Server asks us to fall back to SIMPLE auth, "
+ "but this client is configured to only allow secure connections.");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Server asks us to fall back to simple auth.");
throw new FallbackDisallowedException();
}
LOG.debug("Server asks us to fall back to simple auth.");
dispose();
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
Expand All @@ -44,12 +49,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
Expand Down Expand Up @@ -95,6 +101,7 @@ protected static void initKDCAndConf() throws Exception {
// set a smaller timeout and retry to speed up tests
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 100);
}

protected static void stopKDC() throws InterruptedException {
Expand Down Expand Up @@ -237,7 +244,7 @@ public String getTokenKind() {
}

@Test
public void testRpcFallbackToSimpleAuth() throws Exception {
public void testRpcServerFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
Expand All @@ -252,6 +259,59 @@ public void testRpcFallbackToSimpleAuth() throws Exception {
callRpcService(User.create(clientUgi));
}

@Test
public void testRpcServerDisallowFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });

// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());

clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
IOException error =
assertThrows(IOException.class, () -> callRpcService(User.create(clientUgi)));
// server just closes the connection, so we could get broken pipe, or EOF, or connection closed
if (error.getMessage() == null || !error.getMessage().contains("Broken pipe")) {
assertThat(error,
either(instanceOf(EOFException.class)).or(instanceOf(ConnectionClosedException.class)));
}
}

@Test
public void testRpcClientFallbackToSimpleAuth() throws Exception {
String serverUsername = "testuser";
UserGroupInformation serverUgi =
UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
// check that the server user is insecure
assertNotSame(ugi, serverUgi);
assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
assertEquals(serverUsername, serverUgi.getUserName());

serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
callRpcService(User.create(serverUgi), User.create(ugi));
}

@Test
public void testRpcClientDisallowFallbackToSimpleAuth() throws Exception {
String serverUsername = "testuser";
UserGroupInformation serverUgi =
UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
// check that the server user is insecure
assertNotSame(ugi, serverUgi);
assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
assertEquals(serverUsername, serverUgi.getUserName());

serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, false);
assertThrows(FallbackDisallowedException.class,
() -> callRpcService(User.create(serverUgi), User.create(ugi)));
}

private void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
Expand All @@ -263,25 +323,25 @@ private void setRpcProtection(String clientProtection, String serverProtection)
@Test
public void testSaslWithCommonQop() throws Exception {
setRpcProtection("privacy,authentication", "authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("integrity,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("integrity,authentication", "integrity,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("privacy,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();
}

@Test
public void testSaslNoCommonQop() throws Exception {
setRpcProtection("integrity", "privacy");
SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
SaslException se = assertThrows(SaslException.class, () -> callRpcService());
assertEquals("No common protection layer between client and server", se.getMessage());
}

Expand All @@ -292,7 +352,7 @@ public void testSaslNoCommonQop() throws Exception {
public void testSaslWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("true", "true");
callRpcService(User.create(ugi));
callRpcService();
}

/**
Expand All @@ -303,11 +363,11 @@ public void testDifferentConfWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");

setCryptoAES("false", "true");
callRpcService(User.create(ugi));
callRpcService();

setCryptoAES("true", "false");
try {
callRpcService(User.create(ugi));
callRpcService();
fail("The exception should be thrown out for the rpc timeout.");
} catch (Exception e) {
// ignore the expected exception
Expand All @@ -323,18 +383,20 @@ private void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
private void callRpcService(User serverUser, User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);

InetSocketAddress isa = new InetSocketAddress(HOST, 0);

RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
RpcServer rpcServer = serverUser.getUGI()
.doAs((PrivilegedExceptionAction<
RpcServer>) () -> RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists.newArrayList(
new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1)));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
Expand Down Expand Up @@ -364,6 +426,14 @@ public void uncaughtException(Thread th, Throwable ex) {
}
}

private void callRpcService(User clientUser) throws Exception {
callRpcService(User.create(ugi), clientUser);
}

private void callRpcService() throws Exception {
callRpcService(User.create(ugi));
}

public static class TestThread extends Thread {
private final BlockingInterface stub;

Expand Down