Skip to content

Commit

Permalink
HBASE-28377 Fallback to simple is broken for blocking rpc client (#5690)
Browse files Browse the repository at this point in the history
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
(cherry picked from commit 7bc07a6)
  • Loading branch information
Apache9 committed Feb 19, 2024
1 parent c4b5ac9 commit 2b71b56
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ public Boolean run() throws IOException {
// fall back to simple auth because server told us so.
// do not change authMethod and useSasl here, we should start from secure when
// reconnecting because regionserver may change its sasl config after restart.
saslRpcClient = null;
}
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
Expand Down Expand Up @@ -107,12 +108,9 @@ public boolean saslConnect(InputStream inS, OutputStream outS) throws IOExceptio
int len = inStream.readInt();
if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
if (!fallbackAllowed) {
throw new IOException("Server asks us to fall back to SIMPLE auth, "
+ "but this client is configured to only allow secure connections.");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Server asks us to fall back to simple auth.");
throw new FallbackDisallowedException();
}
LOG.debug("Server asks us to fall back to simple auth.");
dispose();
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,15 +48,16 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
Expand All @@ -61,10 +68,8 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
Expand Down Expand Up @@ -100,9 +105,6 @@ public class TestSecureIPC {
Configuration clientConf;
Configuration serverConf;

@Rule
public ExpectedException exception = ExpectedException.none();

@Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
Expand Down Expand Up @@ -164,7 +166,7 @@ public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
}

@Test
public void testRpcFallbackToSimpleAuth() throws Exception {
public void testRpcServerFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
Expand All @@ -179,7 +181,60 @@ public void testRpcFallbackToSimpleAuth() throws Exception {
callRpcService(User.create(clientUgi));
}

void setRpcProtection(String clientProtection, String serverProtection) {
@Test
public void testRpcServerDisallowFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });

// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());

clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
IOException error =
assertThrows(IOException.class, () -> callRpcService(User.create(clientUgi)));
// server just closes the connection, so we could get broken pipe, or EOF, or connection closed
if (error.getMessage() == null || !error.getMessage().contains("Broken pipe")) {
assertThat(error,
either(instanceOf(EOFException.class)).or(instanceOf(ConnectionClosedException.class)));
}
}

@Test
public void testRpcClientFallbackToSimpleAuth() throws Exception {
String serverUsername = "testuser";
UserGroupInformation serverUgi =
UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
// check that the server user is insecure
assertNotSame(ugi, serverUgi);
assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
assertEquals(serverUsername, serverUgi.getUserName());

serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
callRpcService(User.create(serverUgi), User.create(ugi));
}

@Test
public void testRpcClientDisallowFallbackToSimpleAuth() throws Exception {
String serverUsername = "testuser";
UserGroupInformation serverUgi =
UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
// check that the server user is insecure
assertNotSame(ugi, serverUgi);
assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
assertEquals(serverUsername, serverUgi.getUserName());

serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, false);
assertThrows(FallbackDisallowedException.class,
() -> callRpcService(User.create(serverUgi), User.create(ugi)));
}

private void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
}
Expand All @@ -190,27 +245,26 @@ void setRpcProtection(String clientProtection, String serverProtection) {
@Test
public void testSaslWithCommonQop() throws Exception {
setRpcProtection("privacy,authentication", "authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("integrity,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("integrity,authentication", "integrity,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("privacy,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();
}

@Test
public void testSaslNoCommonQop() throws Exception {
exception.expect(SaslException.class);
exception.expectMessage("No common protection layer between client and server");
setRpcProtection("integrity", "privacy");
callRpcService(User.create(ugi));
SaslException se = assertThrows(SaslException.class, () -> callRpcService());
assertEquals("No common protection layer between client and server", se.getMessage());
}

/**
Expand All @@ -220,7 +274,7 @@ public void testSaslNoCommonQop() throws Exception {
public void testSaslWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("true", "true");
callRpcService(User.create(ugi));
callRpcService();
}

/**
Expand All @@ -231,11 +285,11 @@ public void testDifferentConfWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");

setCryptoAES("false", "true");
callRpcService(User.create(ugi));
callRpcService();

setCryptoAES("true", "false");
try {
callRpcService(User.create(ugi));
callRpcService();
fail("The exception should be thrown out for the rpc timeout.");
} catch (Exception e) {
// ignore the expected exception
Expand All @@ -260,18 +314,20 @@ private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krb
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
private void callRpcService(User serverUser, User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);

InetSocketAddress isa = new InetSocketAddress(HOST, 0);

RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
RpcServer rpcServer = serverUser.getUGI()
.doAs((PrivilegedExceptionAction<
RpcServer>) () -> RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists.newArrayList(
new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1)));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
Expand Down Expand Up @@ -301,6 +357,14 @@ public void uncaughtException(Thread th, Throwable ex) {
}
}

private void callRpcService(User clientUser) throws Exception {
callRpcService(User.create(ugi), clientUser);
}

private void callRpcService() throws Exception {
callRpcService(User.create(ugi));
}

public static class TestThread extends Thread {
private final BlockingInterface stub;

Expand Down

0 comments on commit 2b71b56

Please sign in to comment.