Skip to content

Commit

Permalink
HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails wit…
Browse files Browse the repository at this point in the history
…h broken pipe instead of bad auth

Also change the IPC related tests to test different combinations of rpc server&client, for example, NettyRpcClient and SimpleRpcServer
  • Loading branch information
Apache9 committed Mar 5, 2024
1 parent cde1c9c commit db05609
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
});
}

private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) throws IOException {
private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
assert eventLoop.inEventLoop();
PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -73,15 +74,18 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,14 +118,12 @@ public abstract class AbstractTestIPC {
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);

protected static final Configuration CONF = HBaseConfiguration.create();
static {
// Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
}

protected abstract RpcServer createRpcServer(Server server, String name,
protected RpcServer createRpcServer(Server server, String name,
List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException;
RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
}

private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
Expand All @@ -133,6 +135,14 @@ private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface>
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();

@Parameter(0)
public Class<? extends RpcServer> rpcServerImpl;

@Before
public void setUpBeforeTest() {
CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
}

/**
* Ensure we do not HAVE TO HAVE a codec.
*/
Expand Down Expand Up @@ -348,9 +358,43 @@ public void testTimeout() throws IOException {
}
}

protected abstract RpcServer createTestFailingRpcServer(final String name,
private static class FailingSimpleRpcServer extends SimpleRpcServer {

FailingSimpleRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

protected RpcServer createTestFailingRpcServer(final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException;
Configuration conf, RpcScheduler scheduler) throws IOException {
if (rpcServerImpl.equals(NettyRpcServer.class)) {
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
} else {
return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
}
}

/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test
Expand Down Expand Up @@ -570,19 +614,33 @@ public void testTracingErrorIpc() throws IOException {

protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);

private IOException doBadPreableHeaderCall(BlockingInterface stub) {
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
return ProtobufUtil.handleRemoteException(se);
}

@Test
public void testBadPreambleHeader() throws IOException, ServiceException {
public void testBadPreambleHeader() throws Exception {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
IOException ioe = ProtobufUtil.handleRemoteException(se);
assertThat(ioe, instanceOf(BadAuthException.class));
assertThat(ioe.getMessage(), containsString("authName=unknown"));
BadAuthException error = null;
// for SimpleRpcServer, it is possible that we get a broken pipe before getting the
// BadAuthException, so we add some retries here, see HBASE-28417
for (int i = 0; i < 10; i++) {
IOException ioe = doBadPreableHeaderCall(stub);
if (ioe instanceof BadAuthException) {
error = (BadAuthException) ioe;
break;
}
Thread.sleep(100);
}
assertNotNull("Can not get expected BadAuthException", error);
assertThat(error.getMessage(), containsString("authName=unknown"));
} finally {
rpcServer.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,31 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestBlockingIPC extends AbstractTestIPC {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockingIPC.class);

@Override
protected RpcServer createRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
@Parameters(name = "{index}: rpcServerImpl={0}")
public static List<Object[]> data() {
return Arrays.asList(new Object[] { SimpleRpcServer.class },
new Object[] { NettyRpcServer.class });
}

@Override
Expand Down Expand Up @@ -73,41 +72,6 @@ protected boolean isTcpNoDelay() {
};
}

private static class TestFailingRpcServer extends SimpleRpcServer {

TestFailingRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

@Override
protected RpcServer createTestFailingRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
}

@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new BlockingRpcClient(conf) {
Expand All @@ -124,7 +88,6 @@ protected byte[] getConnectionHeaderPreamble() {
}
};
}

};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
Expand All @@ -51,18 +48,27 @@ public class TestNettyIPC extends AbstractTestIPC {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyIPC.class);

@Parameters(name = "{index}: EventLoop={0}")
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
params.add(new Object[] { "nio" });
params.add(new Object[] { "perClientNio" });
private static List<String> getEventLoopTypes() {
List<String> types = new ArrayList<>();
types.add("nio");
types.add("perClientNio");
if (JVM.isLinux() && JVM.isAmd64()) {
params.add(new Object[] { "epoll" });
types.add("epoll");
}
return types;
}

@Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
public static List<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
for (String eventLoopType : getEventLoopTypes()) {
params.add(new Object[] { SimpleRpcServer.class, eventLoopType });
params.add(new Object[] { NettyRpcServer.class, eventLoopType });
}
return params;
}

@Parameter
@Parameter(1)
public String eventLoopType;

private static NioEventLoopGroup NIO;
Expand Down Expand Up @@ -103,13 +109,6 @@ private void setConf(Configuration conf) {
}
}

@Override
protected RpcServer createRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
}

@Override
protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf);
Expand Down Expand Up @@ -141,13 +140,6 @@ protected boolean isTcpNoDelay() {
};
}

@Override
protected RpcServer createTestFailingRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
}

@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new NettyRpcClient(conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,37 +67,41 @@ public class TestNettyTlsIPC extends AbstractTestIPC {

private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;

@Parameterized.Parameter(0)
@Parameterized.Parameter(1)
public X509KeyType caKeyType;

@Parameterized.Parameter(1)
@Parameterized.Parameter(2)
public X509KeyType certKeyType;

@Parameterized.Parameter(2)
@Parameterized.Parameter(3)
public char[] keyPassword;

@Parameterized.Parameter(3)
@Parameterized.Parameter(4)
public boolean acceptPlainText;

@Parameterized.Parameter(4)
@Parameterized.Parameter(5)
public boolean clientTlsEnabled;

private X509TestContext x509TestContext;

// only netty rpc server supports TLS, so here we will only test NettyRpcServer
@Parameterized.Parameters(
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
+ " clientTlsEnabled={4}")
name = "{index}: rpcServerImpl={0}, caKeyType={1}, certKeyType={2}, keyPassword={3},"
+ " acceptPlainText={4}, clientTlsEnabled={5}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (char[] keyPassword : new char[][] { "".toCharArray(), "pa$$w0rd".toCharArray() }) {
// do not accept plain text
params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword,
false, true });
// support plain text and client enables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
params.add(
new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true, true });
// support plain text and client disables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true,
false });
}
}
}
Expand Down

0 comments on commit db05609

Please sign in to comment.