Skip to content

Commit

Permalink
Fail listener on exception in TcpTransport#openConnection
Browse files Browse the repository at this point in the history
Today `TcpTransport#openConnection` may throw exceptions on certain
kinds of failure, but other kinds of failure are passed to the listener.
This is trappy and not all callers handle it correctly. This commit
makes sure that all exceptions are passed to the listener.

Closes elastic#100510
  • Loading branch information
DaveCTurner committed Nov 8, 2023
1 parent ff28ac7 commit d0f35c2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
35 changes: 18 additions & 17 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,23 +362,24 @@ protected Recycler<BytesRef> createRecycler(Settings settings, PageCacheRecycler

@Override
public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {

Objects.requireNonNull(profile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
ConnectionProfile finalProfile = maybeOverrideConnectionProfile(profile);
if (closeLock.readLock().tryLock() == false) {
ensureOpen();
assert false : "should not get here ever because close-write-lock should only be held on shutdown";
throw new ConnectTransportException(node, "failed to acquire close-read-lock");
}
try {
ensureOpen();
initiateConnection(node, finalProfile, listener);
} finally {
closeLock.readLock().unlock();
}
ActionListener.run(listener, l -> {
Objects.requireNonNull(profile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
final var finalProfile = maybeOverrideConnectionProfile(profile);
if (closeLock.readLock().tryLock() == false) {
ensureOpen();
assert false : "should not get here ever because close-write-lock should only be held on shutdown";
throw new ConnectTransportException(node, "failed to acquire close-read-lock");
}
try {
ensureOpen();
initiateConnection(node, finalProfile, l);
} finally {
closeLock.readLock().unlock();
}
});
}

private void initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Connection> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.VersionInformation;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -881,6 +882,31 @@ public void handleException(TransportException exp) {
assertThat(e.getCause().getCause().getMessage(), equalTo("runtime_exception: bad message !!!"));
}

public void testExceptionOnConnect() {
final var transportA = serviceA.getOriginalTransport();

final var nullProfileFuture = new PlainActionFuture<Transport.Connection>();
transportA.openConnection(nodeB, null, nullProfileFuture);
assertTrue(nullProfileFuture.isDone());
expectThrows(ExecutionException.class, NullPointerException.class, nullProfileFuture::get);

final var profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
final var nullNodeFuture = new PlainActionFuture<Transport.Connection>();
transportA.openConnection(null, profile, nullNodeFuture);
assertTrue(nullNodeFuture.isDone());
expectThrows(ExecutionException.class, ConnectTransportException.class, nullNodeFuture::get);

serviceA.stop();
assertEquals(Lifecycle.State.STOPPED, transportA.lifecycleState());
serviceA.close();
assertEquals(Lifecycle.State.CLOSED, transportA.lifecycleState());

final var closedTransportFuture = new PlainActionFuture<Transport.Connection>();
transportA.openConnection(nodeB, profile, closedTransportFuture);
assertTrue(closedTransportFuture.isDone());
expectThrows(ExecutionException.class, IllegalStateException.class, closedTransportFuture::get);
}

public void testDisconnectListener() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
TransportConnectionListener disconnectListener = new TransportConnectionListener() {
Expand Down

0 comments on commit d0f35c2

Please sign in to comment.