diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index b403a30ecd8aa..97df190597561 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -186,15 +186,25 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans if (closed) { throw new IllegalStateException("transport client is closed, can't remove an address"); } - List builder = new ArrayList<>(); + List listNodesBuilder = new ArrayList<>(); for (DiscoveryNode otherNode : listedNodes) { if (!otherNode.getAddress().equals(transportAddress)) { - builder.add(otherNode); + listNodesBuilder.add(otherNode); } else { - logger.debug("removing address [{}]", otherNode); + logger.debug("removing address [{}] from listed nodes", otherNode); } } - listedNodes = Collections.unmodifiableList(builder); + listedNodes = Collections.unmodifiableList(listNodesBuilder); + List nodesBuilder = new ArrayList<>(); + for (DiscoveryNode otherNode : nodes) { + if (!otherNode.getAddress().equals(transportAddress)) { + nodesBuilder.add(otherNode); + } else { + logger.debug("disconnecting from node with address [{}]", otherNode); + transportService.disconnectFromNode(otherNode); + } + } + nodes = Collections.unmodifiableList(nodesBuilder); nodesSampler.sample(); } return this; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 16c1842adcab2..60cdaf7e9785a 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -547,6 +547,8 @@ protected void doRun() throws Exception { holderToNotify.handler().handleException(sendRequestException); } }); + } else { + logger.debug("Exception while sending request, handler likely already notified due to timeout", e); } } } diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index e94b0b0c8d67a..908d2eb6d1edc 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -20,8 +20,13 @@ package org.elasticsearch.client.transport; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; +import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; @@ -64,6 +69,8 @@ abstract class FailAndRetryMockTransport imp this.clusterName = clusterName; } + protected abstract ClusterState getMockClusterState(DiscoveryNode node); + @Override @SuppressWarnings("unchecked") public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) @@ -71,9 +78,17 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info if (connectMode) { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); - transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY), - node)); + if (TransportLivenessAction.NAME.equals(action)) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY), + node)); + } else if (ClusterStateAction.NAME.equals(action)) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + ClusterState clusterState = getMockClusterState(node); + transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState)); + } else { + throw new UnsupportedOperationException("Mock transport does not understand action " + action); + } return; } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index cd6cd251b34ed..f513660de95a7 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -19,37 +19,44 @@ package org.elasticsearch.client.transport; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportInterceptor; -import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; - -import java.io.Closeable; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import org.hamcrest.CustomMatcher; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.everyItem; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -62,14 +69,44 @@ private static class TestIteration implements Closeable { private final FailAndRetryMockTransport transport; private final TransportService transportService; private final TransportClientNodesService transportClientNodesService; - private final int nodesCount; + private final int listNodesCount; + private final int sniffNodesCount; private TransportAddress livenessAddress = buildNewFakeTransportAddress(); - public Set nodeAddresses = new HashSet<>(); + final List listNodeAddresses; + // map for each address of the nodes a cluster state request should respond with + final Map nodeMap; - TestIteration() { - Settings settings = Settings.builder().put("cluster.name", "test").build(); + TestIteration(Object... extraSettings) { + Settings settings = Settings.builder().put(extraSettings).put("cluster.name", "test").build(); ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); + List listNodes = new ArrayList<>(); + Map nodeMap = new HashMap<>(); + this.listNodesCount = randomIntBetween(1, 10); + int sniffNodesCount = 0; + for (int i = 0; i < listNodesCount; i++) { + TransportAddress transportAddress = buildNewFakeTransportAddress(); + listNodes.add(transportAddress); + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + discoNodes.add(new DiscoveryNode("#list-node#-" + transportAddress, transportAddress, Version.CURRENT)); + + if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings)) { + final int numSniffNodes = randomIntBetween(0, 3); + for (int j = 0; j < numSniffNodes; ++j) { + TransportAddress sniffAddress = buildNewFakeTransportAddress(); + DiscoveryNode sniffNode = new DiscoveryNode("#sniff-node#-" + sniffAddress, sniffAddress, Version.CURRENT); + discoNodes.add(sniffNode); + // also allow sniffing of the sniff node itself + nodeMap.put(sniffAddress, DiscoveryNodes.builder().add(sniffNode).build()); + ++sniffNodesCount; + } + } + nodeMap.put(transportAddress, discoNodes.build()); + } + listNodeAddresses = listNodes; + this.nodeMap = nodeMap; + this.sniffNodesCount = sniffNodesCount; + threadPool = new TestThreadPool("transport-client-nodes-service-tests"); transport = new FailAndRetryMockTransport(random(), clusterName) { @Override @@ -79,7 +116,12 @@ public List getLocalAddresses() { @Override protected TestResponse newResponse() { - return new TestResponse(); + return new TestResponse(); + } + + @Override + protected ClusterState getMockClusterState(DiscoveryNode node) { + return ClusterState.builder(clusterName).nodes(TestIteration.this.nodeMap.get(node.getAddress())).build(); } }; transportService = new TransportService(settings, transport, threadPool, new TransportInterceptor() { @@ -101,14 +143,8 @@ public void sendRequest(DiscoveryNode node, String transportService.start(); transportService.acceptIncomingRequests(); transportClientNodesService = - new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {}); - this.nodesCount = randomIntBetween(1, 10); - for (int i = 0; i < nodesCount; i++) { - TransportAddress transportAddress = buildNewFakeTransportAddress(); - nodeAddresses.add(transportAddress); - transportClientNodesService.addTransportAddresses(transportAddress); - } - transport.endConnectMode(); + new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {}); + transportClientNodesService.addTransportAddresses(listNodeAddresses.toArray(new TransportAddress[0])); } private TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler handler, @@ -145,7 +181,7 @@ public String executor() { @Override public void close() { - + transport.endConnectMode(); transportService.stop(); transportClientNodesService.close(); try { @@ -160,6 +196,7 @@ public void testListenerFailures() throws InterruptedException { int iters = iterations(10, 100); for (int i = 0; i finalFailure = new AtomicReference<>(); @@ -230,7 +267,7 @@ public String executor() { } } - assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount)); assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); } @@ -241,17 +278,42 @@ public void testConnectedNodes() { int iters = iterations(10, 100); for (int i = 0; i ("removed address") { + @Override + public boolean matches(Object item) { + return item instanceof DiscoveryNode && ((DiscoveryNode)item).getAddress().equals(addressToRemove); + } + }))); + assertEquals(iteration.listNodesCount + iteration.sniffNodesCount - 1, service.connectedNodes().size()); + } + } + public static class TestRequest extends TransportRequest { }