Skip to content

Commit

Permalink
Avoid Needless Forking when Closing Transports (#66834)
Browse files Browse the repository at this point in the history
No need to fork off in the changed spots if we block the calling thread anyway.
Also, some other minor cleanups.
  • Loading branch information
original-brownbear committed Sep 10, 2021
1 parent 5db3d9f commit 0d54332
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ protected void doStart() {
bindServer(profileSettings);
}
}
super.doStart();
success = true;
} finally {
if (success == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ protected void doStart() {
}
}

super.doStart();
success = true;
} catch (IOException e) {
throw new ElasticsearchException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ public long getMessagesReceived() {
return messagesReceived.sum();
}


public MeanMetric getWriteBytes() {
return writeBytesMetric;
}

public long getBytesWritten() {
return writeBytesMetric.sum();
}
Expand Down
66 changes: 23 additions & 43 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -66,8 +65,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -162,10 +159,6 @@ public Supplier<CircuitBreaker> getInflightBreaker() {
return () -> circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
}

@Override
protected void doStart() {
}

@Override
public synchronized void setMessageListener(TransportMessageListener listener) {
outboundHandler.setMessageListener(listener);
Expand Down Expand Up @@ -288,8 +281,8 @@ public void openConnection(DiscoveryNode node, ConnectionProfile profile, Action
}
}

private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
ActionListener<Transport.Connection> listener) {
private void initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
ActionListener<Connection> listener) {
int numConnections = connectionProfile.getNumConnections();
assert numConnections > 0 : "A connection profile must be configured with at least one connection";

Expand All @@ -303,11 +296,11 @@ private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfil
} catch (ConnectTransportException e) {
CloseableChannel.closeChannels(channels, false);
listener.onFailure(e);
return channels;
return;
} catch (Exception e) {
CloseableChannel.closeChannels(channels, false);
listener.onFailure(new ConnectTransportException(node, "general node connection failure", e));
return channels;
return;
}
}

Expand All @@ -320,7 +313,6 @@ private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfil

TimeValue connectTimeout = connectionProfile.getConnectTimeout();
threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC);
return channels;
}

@Override
Expand Down Expand Up @@ -559,42 +551,31 @@ protected final void doClose() {

@Override
protected final void doStop() {
final CountDownLatch latch = new CountDownLatch(1);
// make sure we run it on another thread than a possible IO handler thread
assert Transports.assertNotTransportThread("Must not block transport thread that might be needed for closing channels below");
assert threadPool.generic().isShutdown() == false : "Must stop transport before terminating underlying threadpool";
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
keepAlive.close();
closeLock.writeLock().lock();
try {
keepAlive.close();

// first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<TcpServerChannel>> entry : serverChannels.entrySet()) {
String profile = entry.getKey();
List<TcpServerChannel> channels = entry.getValue();
ActionListener<Void> closeFailLogger = ActionListener.wrap(c -> {
// first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<TcpServerChannel>> entry : serverChannels.entrySet()) {
String profile = entry.getKey();
List<TcpServerChannel> channels = entry.getValue();
ActionListener<Void> closeFailLogger = ActionListener.wrap(c -> {
},
e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e));
channels.forEach(c -> c.addCloseListener(closeFailLogger));
CloseableChannel.closeChannels(channels, true);
}
serverChannels.clear();

// close all of the incoming channels. The closeChannels method takes a list so we must convert the set.
CloseableChannel.closeChannels(new ArrayList<>(acceptedChannels), true);
acceptedChannels.clear();

stopInternal();
} finally {
closeLock.writeLock().unlock();
latch.countDown();
channels.forEach(c -> c.addCloseListener(closeFailLogger));
CloseableChannel.closeChannels(channels, true);
}
});
serverChannels.clear();

try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
// close all of the incoming channels. The closeChannels method takes a list so we must convert the set.
CloseableChannel.closeChannels(new ArrayList<>(acceptedChannels), true);
acceptedChannels.clear();

stopInternal();
} finally {
closeLock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -845,7 +826,6 @@ private void ensureOpen() {

@Override
public final TransportStats getStats() {
final MeanMetric writeBytesMetric = statsTracker.getWriteBytes();
final long bytesWritten = statsTracker.getBytesWritten();
final long messagesSent = statsTracker.getMessagesSent();
final long messagesReceived = statsTracker.getMessagesReceived();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.node.NodeClosedException;
Expand All @@ -52,7 +51,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -201,15 +199,6 @@ protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool
return new TaskManager(settings, threadPool, taskHeaders);
}

/**
* The executor service for this transport service.
*
* @return the executor service
*/
private ExecutorService getExecutorService() {
return threadPool.generic();
}

void setTracerLogInclude(List<String> tracerLogInclude) {
this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY);
}
Expand Down Expand Up @@ -247,33 +236,14 @@ protected void doStop() {
// in case the transport is not connected to our local node (thus cleaned on node disconnect)
// make sure to clean any leftover on going handles
for (final Transport.ResponseContext<?> holderToNotify : responseHandlers.prune(h -> true)) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
getExecutorService().execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
"failed to notify response handler on rejection, action: {}",
holderToNotify.action()),
e);
}
@Override
public void onFailure(Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
holderToNotify.action()),
e);
}
@Override
public void doRun() {
TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(),
holderToNotify.action(), new NodeClosedException(localNode));
holderToNotify.handler().handleException(ex);
}
});
try {
holderToNotify.handler().handleException(new SendRequestTransportException(holderToNotify.connection().getNode(),
holderToNotify.action(), new NodeClosedException(localNode)));
} catch (Exception e) {
assert false : e;
logger.warn(() -> new ParameterizedMessage("failed to notify response handler on exception, action: {}",
holderToNotify.action()), e);
}
}
}
}
Expand Down Expand Up @@ -1023,29 +993,33 @@ private void checkForTimeout(long requestId) {

@Override
public void onConnectionClosed(Transport.Connection connection) {
try {
List<Transport.ResponseContext<? extends TransportResponse>> pruned =
List<Transport.ResponseContext<? extends TransportResponse>> pruned =
responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey()));
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
getExecutorService().execute(new Runnable() {
@Override
@SuppressWarnings("rawtypes")
public void run() {
for (Transport.ResponseContext holderToNotify : pruned) {
holderToNotify.handler().handleException(
new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
}
}
if (pruned.isEmpty()) {
return;
}

@Override
public String toString() {
return "onConnectionClosed(" + connection.getNode() + ")";
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void doRun() {
for (Transport.ResponseContext<?> holderToNotify : pruned) {
holderToNotify.handler().handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on onConnectionClosed", ex);
}
}

@Override
public void onFailure(Exception e) {
assert false : e;
logger.warn(() -> new ParameterizedMessage("failed to notify response handler on connection close [{}]", connection), e);
}

@Override
public String toString() {
return "onConnectionClosed(" + connection.getNode() + ")";
}
});
}

final class TimeoutHandler implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ private void testDefaultSeedAddresses(final Settings settings, Matcher<Iterable<
final TcpTransport tcpTransport = new TcpTransport(settings, Version.CURRENT, testThreadPool,
new MockPageCacheRecycler(settings),
new NoneCircuitBreakerService(), writableRegistry(), new NetworkService(Collections.emptyList())) {
@Override
protected void doStart() {
throw new UnsupportedOperationException();
}

@Override
protected TcpServerChannel bind(String name, InetSocketAddress address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ protected void doStart() {
bindServer(profileSettings);
}
}

super.doStart();
success = true;
} catch (IOException e) {
throw new ElasticsearchException(e);
Expand Down

0 comments on commit 0d54332

Please sign in to comment.