diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 9bf773d083f5f..8612b5221c77b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -362,23 +362,24 @@ protected Recycler createRecycler(Settings settings, PageCacheRecycler @Override public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener listener) { - - Objects.requireNonNull(profile, "connection profile cannot be null"); - if (node == null) { - throw new ConnectTransportException(null, "can't open connection to a null node"); - } - ConnectionProfile finalProfile = maybeOverrideConnectionProfile(profile); - if (closeLock.readLock().tryLock() == false) { - ensureOpen(); - assert false : "should not get here ever because close-write-lock should only be held on shutdown"; - throw new ConnectTransportException(node, "failed to acquire close-read-lock"); - } - try { - ensureOpen(); - initiateConnection(node, finalProfile, listener); - } finally { - closeLock.readLock().unlock(); - } + ActionListener.run(listener, l -> { + Objects.requireNonNull(profile, "connection profile cannot be null"); + if (node == null) { + throw new ConnectTransportException(null, "can't open connection to a null node"); + } + final var finalProfile = maybeOverrideConnectionProfile(profile); + if (closeLock.readLock().tryLock() == false) { + ensureOpen(); + assert false : "should not get here ever because close-write-lock should only be held on shutdown"; + throw new ConnectTransportException(node, "failed to acquire close-read-lock"); + } + try { + ensureOpen(); + initiateConnection(node, finalProfile, l); + } finally { + closeLock.readLock().unlock(); + } + }); } private void initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index f9085ec258627..ea9dd001e5ce8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -881,6 +882,31 @@ public void handleException(TransportException exp) { assertThat(e.getCause().getCause().getMessage(), equalTo("runtime_exception: bad message !!!")); } + public void testExceptionOnConnect() { + final var transportA = serviceA.getOriginalTransport(); + + final var nullProfileFuture = new PlainActionFuture(); + transportA.openConnection(nodeB, null, nullProfileFuture); + assertTrue(nullProfileFuture.isDone()); + expectThrows(ExecutionException.class, NullPointerException.class, nullProfileFuture::get); + + final var profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); + final var nullNodeFuture = new PlainActionFuture(); + transportA.openConnection(null, profile, nullNodeFuture); + assertTrue(nullNodeFuture.isDone()); + expectThrows(ExecutionException.class, ConnectTransportException.class, nullNodeFuture::get); + + serviceA.stop(); + assertEquals(Lifecycle.State.STOPPED, transportA.lifecycleState()); + serviceA.close(); + assertEquals(Lifecycle.State.CLOSED, transportA.lifecycleState()); + + final var closedTransportFuture = new PlainActionFuture(); + transportA.openConnection(nodeB, profile, closedTransportFuture); + assertTrue(closedTransportFuture.isDone()); + expectThrows(ExecutionException.class, IllegalStateException.class, closedTransportFuture::get); + } + public void testDisconnectListener() throws Exception { final CountDownLatch latch = new CountDownLatch(1); TransportConnectionListener disconnectListener = new TransportConnectionListener() {