Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public enum Property {
"Properties in this category related to the configuration of SSL keys for"
+ " RPC. See also instance.ssl.enabled",
"1.6.0"),
RPC_BACKLOG("rpc.backlog", "50", PropertyType.COUNT,
"Configures the TCP backlog for the server side sockets created by Thrift."
+ " This property is not used for SSL type server sockets. A value of zero"
+ " will use the Thrift default value.",
"2.1.3"),
RPC_SSL_KEYSTORE_PATH("rpc.javax.net.ssl.keyStore", "", PropertyType.PATH,
"Path of the keystore file for the server's private SSL key", "1.6.0"),
@Sensitive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerSocket.NonblockingAbstractServerSocketArgs;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerSocket.ServerSocketTransportArgs;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
Expand Down Expand Up @@ -158,6 +160,8 @@ public static ServerAddress startServer(ServerContext context, String hostname,
portSearch = config.getBoolean(portSearchProperty);
}

int backlog = config.getCount(Property.RPC_BACKLOG);

final ThriftServerType serverType = context.getThriftServerType();

if (serverType == ThriftServerType.SASL) {
Expand All @@ -174,7 +178,7 @@ public static ServerAddress startServer(ServerContext context, String hostname,
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(),
addresses);
backlog, addresses);
} catch (TTransportException e) {
if (portSearch) {
// Build a list of reserved ports - as identified by properties of type PropertyType.PORT
Expand All @@ -199,7 +203,7 @@ public static ServerAddress startServer(ServerContext context, String hostname,
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
context.getServerSslParams(), context.getSaslParams(),
context.getClientTimeoutInMillis(), addr);
context.getClientTimeoutInMillis(), backlog, addr);
} catch (TTransportException tte) {
log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
}
Expand All @@ -220,11 +224,13 @@ public static ServerAddress startServer(ServerContext context, String hostname,
private static ServerAddress createThreadedSelectorServer(HostAndPort address,
TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf,
long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
long timeBetweenThreadChecks, long maxMessageSize, int backlog) throws TTransportException {

NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs()
.backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort()))
.clientTimeout(0).maxFrameSize(Ints.saturatedCast(maxMessageSize));

final TNonblockingServerSocket transport =
new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0,
Ints.saturatedCast(maxMessageSize));
final TNonblockingServerSocket transport = new TNonblockingServerSocket(args);

TThreadedSelectorServer.Args options = new TThreadedSelectorServer.Args(transport);

Expand Down Expand Up @@ -256,11 +262,13 @@ private static ServerAddress createThreadedSelectorServer(HostAndPort address,
private static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, final String serverName, final int numThreads,
final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks,
long maxMessageSize) throws TTransportException {
long maxMessageSize, int backlog) throws TTransportException {

NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs()
.backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort()))
.clientTimeout(0).maxFrameSize(Ints.saturatedCast(maxMessageSize));

final TNonblockingServerSocket transport =
new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0,
Ints.saturatedCast(maxMessageSize));
final TNonblockingServerSocket transport = new TNonblockingServerSocket(args);
final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);

options.protocolFactory(protocolFactory);
Expand Down Expand Up @@ -329,12 +337,15 @@ private static ThreadPoolExecutor createSelfResizingThreadPool(final String serv
*/
private static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads,
long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks)
throws TTransportException {
long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks,
int backlog) throws TTransportException {

InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
// Must use an ISA, providing only a port would ignore the hostname given
TServerSocket transport = new TServerSocket(isa);

ServerSocketTransportArgs args = new ServerSocketTransportArgs().backlog(backlog).bindAddr(isa);

TServerSocket transport = new TServerSocket(args);
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
conf, timeBetweenThreadChecks);
TThreadPoolServer server = createTThreadPoolServer(transport, processor,
Expand Down Expand Up @@ -455,7 +466,8 @@ private static ServerAddress createSslThreadPoolServer(HostAndPort address, TPro
private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params,
final String serverName, final int numThreads, final long threadTimeOut,
final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws TTransportException {
final AccumuloConfiguration conf, long timeBetweenThreadChecks, int backlog)
throws TTransportException {
// We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the
// TThreadPoolServer does,
// but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it
Expand All @@ -465,7 +477,10 @@ private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TPr
address.getPort());
InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
// Must use an ISA, providing only a port would ignore the hostname given
TServerSocket transport = new TServerSocket(isa, (int) socketTimeout);
ServerSocketTransportArgs args = new ServerSocketTransportArgs().backlog(backlog).bindAddr(isa)
.clientTimeout((int) socketTimeout);

TServerSocket transport = new TServerSocket(args);

String hostname, fqdn;
try {
Expand Down Expand Up @@ -550,7 +565,7 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf,
ThriftServerType serverType, TProcessor processor, String serverName, String threadName,
int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize,
SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, HostAndPort... addresses) {
long serverSocketTimeout, int backlog, HostAndPort... addresses) {

if (serverType == ThriftServerType.SASL) {
processor = updateSaslProcessor(serverType, processor);
Expand All @@ -559,7 +574,7 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf,
try {
return startTServer(serverType, new TimedProcessor(processor), serverName, threadName,
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams,
saslParams, serverSocketTimeout, addresses);
saslParams, serverSocketTimeout, backlog, addresses);
} catch (TTransportException e) {
throw new IllegalStateException(e);
}
Expand All @@ -576,7 +591,7 @@ private static ServerAddress startTServer(ThriftServerType serverType, TimedProc
String serverName, String threadName, int numThreads, long threadTimeOut,
final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize,
SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
long serverSocketTimeout, int backlog, HostAndPort... addresses) throws TTransportException {
TProtocolFactory protocolFactory = ThriftUtil.protocolFactory();
// This is presently not supported. It's hypothetically possible, I believe, to work, but it
// would require changes in how the transports
Expand All @@ -599,24 +614,24 @@ private static ServerAddress startTServer(ThriftServerType serverType, TimedProc
log.debug("Instantiating SASL Thrift server");
serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory,
serverSocketTimeout, saslParams, serverName, numThreads, threadTimeOut, conf,
timeBetweenThreadChecks);
timeBetweenThreadChecks, backlog);
break;
case THREADPOOL:
log.debug("Instantiating unsecure TThreadPool Thrift server");
serverAddress =
createBlockingServer(address, processor, protocolFactory, maxMessageSize,
serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks);
serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, backlog);
break;
case THREADED_SELECTOR:
log.debug("Instantiating default, unsecure Threaded selector Thrift server");
serverAddress =
createThreadedSelectorServer(address, processor, protocolFactory, serverName,
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize);
serverAddress = createThreadedSelectorServer(address, processor, protocolFactory,
serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks,
maxMessageSize, backlog);
break;
case CUSTOM_HS_HA:
log.debug("Instantiating unsecure custom half-async Thrift server");
serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName,
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize);
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog);
break;
default:
throw new IllegalArgumentException("Unknown server type " + serverType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ private HostAndPort startStatsService() {
ServerAddress server = TServerUtils.startTServer(getConfiguration(),
getContext().getThriftServerType(), processor, this.getClass().getSimpleName(),
"GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize,
getContext().getServerSslParams(), getContext().getSaslParams(), 0, addresses);
getContext().getServerSslParams(), getContext().getSaslParams(), 0,
getConfiguration().getCount(Property.RPC_BACKLOG), addresses);
log.debug("Starting garbage collector listening on " + server.address);
return server.address;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
Expand Down Expand Up @@ -120,10 +121,11 @@ public static void main(String[] args) throws Exception {
TabletScanClientService.Processor.class, TabletScanClientService.Iface.class, tch,
context));

ServerAddress serverPort =
TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA,
muxProcessor, "ZombieTServer", "walking dead", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
1000, 10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", port));
ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(),
ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking dead", 2,
ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1,
context.getConfiguration().getCount(Property.RPC_BACKLOG),
HostAndPort.fromParts("0.0.0.0", port));

String addressString = serverPort.address.toString();
var zLockPath =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ public static void main(String[] args) throws Exception {

TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA,
muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", opts.port));
10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG),
HostAndPort.fromParts("0.0.0.0", opts.port));

HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);

Expand Down