Skip to content

Commit

Permalink
Prevent connection races in testEnsureWeReconnect (#56847)
Browse files Browse the repository at this point in the history
Currently it is possible that a sniff connection round is occurring as
we enter another test loop in testEnsureWeReconnect. The problem is that
once we enter another loop, closing the connection manually can cause
this pre-existing connection round to fail. This round failing can fail
the test. This commit fixes the issue by ensuring that there are no
in-progress connections before entering another loop.
  • Loading branch information
Tim-Brooks committed May 15, 2020
1 parent 5f4411c commit 0b25ad3
Showing 1 changed file with 17 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -31,7 +32,6 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport;
Expand Down Expand Up @@ -86,37 +86,27 @@ public void testEnsureWeReconnect() throws Exception {
.put("cluster.remote.test.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
Semaphore semaphore = new Semaphore(1);
service.start();
service.getRemoteClusterService().getConnections().forEach(con -> {
con.getConnectionManager().addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (remoteNode.equals(node)) {
semaphore.release();
}
}
});
});
// this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have
// the right calls in place in the RemoteAwareClient
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)));
for (int i = 0; i < 10; i++) {
semaphore.acquire();
try {
service.getRemoteClusterService().getConnections().forEach(con -> {
con.getConnectionManager().disconnectFromNode(remoteNode);
});
semaphore.acquire();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
} finally {
semaphore.release();
}
RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test");
assertBusy(remoteClusterConnection::assertNoRunningConnections);
ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
Transport.Connection connection = connectionManager.getConnection(remoteNode);
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
connection.addCloseListener(closeFuture);
connectionManager.disconnectFromNode(remoteNode);
closeFuture.get();

Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
assertTrue(remoteClusterConnection.isNodeConnected(remoteNode));
}
}
}
Expand Down

0 comments on commit 0b25ad3

Please sign in to comment.