Skip to content

Commit

Permalink
Use single netty event loop group for transports (#35181)
Browse files Browse the repository at this point in the history
Currently we create a new netty event loop group for client connections
and all server profiles. Each new group creates new threads for io
processing. This means 2 * num of processors new threads for each group.
A single group should be able to handle all io processing (for the
transports). This also brings the netty module inline with what we do
for nio.

Additionally, this PR renames the worker threads to be the same for
netty and nio.
  • Loading branch information
Tim-Brooks committed Nov 2, 2018
1 parent 3c36ba1 commit 0166388
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
Expand All @@ -59,8 +57,6 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -101,8 +97,9 @@ public class Netty4Transport extends TcpTransport {
private final int workerCount;
private final ByteSizeValue receivePredictorMin;
private final ByteSizeValue receivePredictorMax;
private volatile Bootstrap clientBootstrap;
private final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
private volatile Bootstrap clientBootstrap;
private volatile NioEventLoopGroup eventLoopGroup;

public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
Expand All @@ -125,10 +122,12 @@ public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService
protected void doStart() {
boolean success = false;
try {
clientBootstrap = createClientBootstrap();
ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory);
clientBootstrap = createClientBootstrap(eventLoopGroup);
if (NetworkService.NETWORK_SERVER.get(settings)) {
for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings);
createServerBootstrap(profileSettings, eventLoopGroup);
bindServer(profileSettings);
}
}
Expand All @@ -141,9 +140,9 @@ protected void doStart() {
}
}

private Bootstrap createClientBootstrap() {
private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);

bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
Expand All @@ -167,7 +166,7 @@ private Bootstrap createClientBootstrap() {
return bootstrap;
}

private void createServerBootstrap(ProfileSettings profileSettings) {
private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) {
String name = profileSettings.profileName;
if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
Expand All @@ -176,12 +175,9 @@ private void createServerBootstrap(ProfileSettings profileSettings) {
receivePredictorMin, receivePredictorMax);
}


final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name);

final ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory));
serverBootstrap.group(eventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);

serverBootstrap.childHandler(getServerChannelInitializer(name));
Expand Down Expand Up @@ -274,25 +270,14 @@ long failedPingCount() {
@SuppressForbidden(reason = "debug")
protected void stopInternal() {
Releasables.close(() -> {
final List<Tuple<String, Future<?>>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size());
for (final Map.Entry<String, ServerBootstrap> entry : serverBootstraps.entrySet()) {
serverBootstrapCloseFutures.add(
Tuple.tuple(entry.getKey(), entry.getValue().config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS)));
Future<?> shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
shutdownFuture.awaitUninterruptibly();
if (shutdownFuture.isSuccess() == false) {
logger.warn("Error closing netty event loop group", shutdownFuture.cause());
}
for (final Tuple<String, Future<?>> future : serverBootstrapCloseFutures) {
future.v2().awaitUninterruptibly();
if (!future.v2().isSuccess()) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"Error closing server bootstrap for profile [{}]", future.v1()), future.v2().cause());
}
}
serverBootstraps.clear();

if (clientBootstrap != null) {
clientBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
clientBootstrap = null;
}
serverBootstraps.clear();
clientBootstrap = null;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -57,8 +56,6 @@

public class NioTransport extends TcpTransport {

private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;

public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
Expand Down Expand Up @@ -94,7 +91,7 @@ protected NioTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void>
protected void doStart() {
boolean success = false;
try {
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX),
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX),
NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s));

ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@

public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport {

public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker";
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";

public static final Setting<List<String>> HOST =
listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public enum Transports {
/** threads whose name is prefixed by this string will be considered network threads, even though they aren't */
public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread";

public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker";

/**
* Utility method to detect whether a thread is a network thread. Typically
* used in assertions to make sure that we do not call blocking code from
Expand All @@ -41,10 +39,8 @@ public static boolean isTransportThread(Thread t) {
for (String s : Arrays.asList(
HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX,
NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX)) {
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
if (threadName.contains(s)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.transport.TcpServerChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -65,8 +64,6 @@

public class MockNioTransport extends TcpTransport {

private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;

private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private volatile NioGroup nioGroup;
Expand Down Expand Up @@ -97,7 +94,7 @@ protected MockSocketChannel initiateChannel(DiscoveryNode node, ActionListener<V
protected void doStart() {
boolean success = false;
try {
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestingSocketEventHandler(this::onNonChannelException, s));

ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
Expand Down

0 comments on commit 0166388

Please sign in to comment.