Skip to content

Commit

Permalink
Revert "Fail fast remote cluster requests (#80589)"
Browse files Browse the repository at this point in the history
This reverts commit 8ce950b.
  • Loading branch information
ywelsch committed Dec 2, 2021
1 parent 8ce950b commit b1130cf
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private <Request extends ActionRequest, Response extends ActionResponse> Transpo

@Override
public Client getRemoteClusterClient(String clusterAlias) {
return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias, true);
return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias);
}

public NamedWriteableRegistry getNamedWriteableRegistry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,12 @@ final class RemoteClusterAwareClient extends AbstractClient {
private final TransportService service;
private final String clusterAlias;
private final RemoteClusterService remoteClusterService;
private final boolean ensureConnected;

RemoteClusterAwareClient(
Settings settings,
ThreadPool threadPool,
TransportService service,
String clusterAlias,
boolean ensureConnected
) {
RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias) {
super(settings, threadPool);
this.service = service;
this.clusterAlias = clusterAlias;
this.remoteClusterService = service.getRemoteClusterService();
this.ensureConnected = ensureConnected;
}

@Override
Expand All @@ -45,21 +37,13 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
Request request,
ActionListener<Response> listener
) {
maybeEnsureConnected(ActionListener.wrap(v -> {
final Transport.Connection connection;
try {
if (request instanceof RemoteClusterAwareRequest) {
DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode();
connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias);
} else {
connection = remoteClusterService.getConnection(clusterAlias);
}
} catch (NoSuchRemoteClusterException e) {
if (ensureConnected == false) {
// trigger another connection attempt, but don't wait for it to complete
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(() -> {}));
}
throw e;
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
Transport.Connection connection;
if (request instanceof RemoteClusterAwareRequest) {
DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode();
connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias);
} else {
connection = remoteClusterService.getConnection(clusterAlias);
}
service.sendRequest(
connection,
Expand All @@ -71,14 +55,6 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
}, listener::onFailure));
}

private void maybeEnsureConnected(ActionListener<Void> ensureConnectedListener) {
if (ensureConnected) {
remoteClusterService.ensureConnected(clusterAlias, ensureConnectedListener);
} else {
ensureConnectedListener.onResponse(null);
}
}

@Override
public void close() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,11 @@ public void onFailure(Exception e) {
/**
* Returns a client to the remote cluster if the given cluster alias exists.
*
* @param threadPool the {@link ThreadPool} for the client
* @param clusterAlias the cluster alias the remote cluster is registered under
* @param ensureConnected whether requests should wait for a connection attempt when there isn't a connection available
* @param threadPool the {@link ThreadPool} for the client
* @param clusterAlias the cluster alias the remote cluster is registered under
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
*/
public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias, boolean ensureConnected) {
public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) {
if (transportService.getRemoteClusterService().isEnabled() == false) {
throw new IllegalArgumentException(
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
Expand All @@ -488,22 +487,7 @@ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias,
if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) {
throw new NoSuchRemoteClusterException(clusterAlias);
}
return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias, ensureConnected);
}

/**
* Returns a client to the remote cluster if the given cluster alias exists.
*
* @param threadPool the {@link ThreadPool} for the client
* @param clusterAlias the cluster alias the remote cluster is registered under
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
*/
public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) {
return getRemoteClusterClient(
threadPool,
clusterAlias,
transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false
);
return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias);
}

Collection<RemoteClusterConnection> getConnections() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti

}
}
// put the following in assert busy as connections are lazily reestablished
assertBusy(() -> {
{
SearchRequest searchRequest = new SearchRequest();
final CountDownLatch latch = new CountDownLatch(1);
SetOnce<Tuple<SearchRequest, ActionListener<SearchResponse>>> setOnce = new SetOnce<>();
Expand Down Expand Up @@ -754,7 +753,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
assertEquals(totalClusters, searchResponse.getClusters().getTotal());
assertEquals(totalClusters, searchResponse.getClusters().getSuccessful());
assertEquals(totalClusters == 1 ? 1 : totalClusters + 1, searchResponse.getNumReducePhases());
});
}
assertEquals(0, service.getConnectionManager().size());
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
Expand Down Expand Up @@ -914,8 +913,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti

}
}
// run the following under assertBusy as connections are lazily reestablished
assertBusy(() -> {
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
Expand All @@ -939,7 +937,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
assertTrue(map.containsKey(clusterAlias));
assertNotNull(map.get(clusterAlias));
}
});
}
assertEquals(0, service.getConnectionManager().size());
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,7 @@ public void testSearchShards() throws Exception {
service.start();
service.acceptIncomingRequests();

try (
RemoteClusterAwareClient client = new RemoteClusterAwareClient(
Settings.EMPTY,
threadPool,
service,
"cluster1",
randomBoolean()
)
) {
try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) {
SearchRequest request = new SearchRequest("test-index");
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
Expand Down Expand Up @@ -109,15 +101,7 @@ public void testSearchShardsThreadContextHeader() {
service.start();
service.acceptIncomingRequests();

try (
RemoteClusterAwareClient client = new RemoteClusterAwareClient(
Settings.EMPTY,
threadPool,
service,
"cluster1",
randomBoolean()
)
) {
try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) {
SearchRequest request = new SearchRequest("test-index");
int numThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -65,7 +63,7 @@ public void testConnectAndExecuteRequest() throws Exception {
logger.info("now accepting incoming requests on local transport");
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test", randomBoolean());
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
Expand Down Expand Up @@ -116,7 +114,7 @@ public void testEnsureWeReconnect() throws Exception {
connectionManager.disconnectFromNode(remoteNode);
closeFuture.get();

Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test", true);
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
Expand All @@ -134,69 +132,10 @@ public void testRemoteClusterServiceNotEnabled() {
final RemoteClusterService remoteClusterService = service.getRemoteClusterService();
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> remoteClusterService.getRemoteClusterClient(threadPool, "test", randomBoolean())
() -> remoteClusterService.getRemoteClusterClient(threadPool, "test")
);
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
}
}

public void testQuicklySkipUnavailableClusters() throws Exception {
Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
try (
MockTransportService remoteTransport = startTransport(
"remote_node",
Collections.emptyList(),
Version.CURRENT,
threadPool,
remoteSettings
)
) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();

Settings localSettings = Settings.builder()
.put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))
.put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort())
.put("cluster.remote.test.skip_unavailable", true)
.put("cluster.remote.initial_connect_timeout", "0s")
.build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
CountDownLatch latch = new CountDownLatch(1);
service.addConnectBehavior(remoteTransport, (transport, discoveryNode, profile, listener) -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
listener.onFailure(new ConnectTransportException(discoveryNode, "simulated"));
});
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");

try {
assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode));

// check that we quickly fail
expectThrows(
NoSuchRemoteClusterException.class,
() -> client.admin().cluster().prepareState().get(TimeValue.timeValueSeconds(10))
);
} finally {
service.clearAllRules();
latch.countDown();
}

assertBusy(() -> {
try {
client.admin().cluster().prepareState().get();
} catch (NoSuchRemoteClusterException e) {
// keep retrying on this exception, the goal is to check that we eventually reconnect
throw new AssertionError(e);
}
});
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
}
}
}
}

0 comments on commit b1130cf

Please sign in to comment.