From 8a69709911152ab0caa820146acfc244cc4d5cbe Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Tue, 22 Nov 2016 14:41:13 -0800 Subject: [PATCH 1/2] Transport client: Fix remove address to actually work The removeTransportAddress method of TransportClient removes the address from the list of nodes that the client pings to sniff for nodes. However, it does not remove it from the list of existing connected nodes. This means removing a node is not possible, as long as that node is still up. This change removes the node from the connected nodes list before triggering sampling (ie sniffing). While the fix is simple, testing was not because there were no existing tests for sniffing. This change also modifies the mocks used by transport client unit tests in order to allow mocking sniffing. --- .../TransportClientNodesService.java | 18 ++- .../transport/TransportService.java | 2 + .../transport/FailAndRetryMockTransport.java | 21 ++- .../TransportClientNodesServiceTests.java | 120 +++++++++++++----- 4 files changed, 125 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index b403a30ecd8aa..840ebe0fdc138 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -186,15 +186,25 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans if (closed) { throw new IllegalStateException("transport client is closed, can't remove an address"); } - List builder = new ArrayList<>(); + List listNodesBuilder = new ArrayList<>(); for (DiscoveryNode otherNode : listedNodes) { if (!otherNode.getAddress().equals(transportAddress)) { - builder.add(otherNode); + listNodesBuilder.add(otherNode); } else { - logger.debug("removing address [{}]", otherNode); + logger.debug("removing address [{}] from list nodes", otherNode); } } - listedNodes = Collections.unmodifiableList(builder); + listedNodes = Collections.unmodifiableList(listNodesBuilder); + List nodesBuilder = new ArrayList<>(); + for (DiscoveryNode otherNode : nodes) { + if (!otherNode.getAddress().equals(transportAddress)) { + nodesBuilder.add(otherNode); + } else { + logger.debug("removing address [{}] from connected nodes", otherNode); + transportService.disconnectFromNode(otherNode); + } + } + nodes = Collections.unmodifiableList(nodesBuilder); nodesSampler.sample(); } return this; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 16c1842adcab2..60cdaf7e9785a 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -547,6 +547,8 @@ protected void doRun() throws Exception { holderToNotify.handler().handleException(sendRequestException); } }); + } else { + logger.debug("Exception while sending request, handler likely already notified due to timeout", e); } } } diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index e94b0b0c8d67a..908d2eb6d1edc 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -20,8 +20,13 @@ package org.elasticsearch.client.transport; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; +import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; @@ -64,6 +69,8 @@ abstract class FailAndRetryMockTransport imp this.clusterName = clusterName; } + protected abstract ClusterState getMockClusterState(DiscoveryNode node); + @Override @SuppressWarnings("unchecked") public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) @@ -71,9 +78,17 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info if (connectMode) { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); - transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY), - node)); + if (TransportLivenessAction.NAME.equals(action)) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY), + node)); + } else if (ClusterStateAction.NAME.equals(action)) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + ClusterState clusterState = getMockClusterState(node); + transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState)); + } else { + throw new UnsupportedOperationException("Mock transport does not understand action " + action); + } return; } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index cd6cd251b34ed..f513660de95a7 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -19,37 +19,44 @@ package org.elasticsearch.client.transport; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportInterceptor; -import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; - -import java.io.Closeable; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import org.hamcrest.CustomMatcher; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.everyItem; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -62,14 +69,44 @@ private static class TestIteration implements Closeable { private final FailAndRetryMockTransport transport; private final TransportService transportService; private final TransportClientNodesService transportClientNodesService; - private final int nodesCount; + private final int listNodesCount; + private final int sniffNodesCount; private TransportAddress livenessAddress = buildNewFakeTransportAddress(); - public Set nodeAddresses = new HashSet<>(); + final List listNodeAddresses; + // map for each address of the nodes a cluster state request should respond with + final Map nodeMap; - TestIteration() { - Settings settings = Settings.builder().put("cluster.name", "test").build(); + TestIteration(Object... extraSettings) { + Settings settings = Settings.builder().put(extraSettings).put("cluster.name", "test").build(); ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); + List listNodes = new ArrayList<>(); + Map nodeMap = new HashMap<>(); + this.listNodesCount = randomIntBetween(1, 10); + int sniffNodesCount = 0; + for (int i = 0; i < listNodesCount; i++) { + TransportAddress transportAddress = buildNewFakeTransportAddress(); + listNodes.add(transportAddress); + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + discoNodes.add(new DiscoveryNode("#list-node#-" + transportAddress, transportAddress, Version.CURRENT)); + + if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings)) { + final int numSniffNodes = randomIntBetween(0, 3); + for (int j = 0; j < numSniffNodes; ++j) { + TransportAddress sniffAddress = buildNewFakeTransportAddress(); + DiscoveryNode sniffNode = new DiscoveryNode("#sniff-node#-" + sniffAddress, sniffAddress, Version.CURRENT); + discoNodes.add(sniffNode); + // also allow sniffing of the sniff node itself + nodeMap.put(sniffAddress, DiscoveryNodes.builder().add(sniffNode).build()); + ++sniffNodesCount; + } + } + nodeMap.put(transportAddress, discoNodes.build()); + } + listNodeAddresses = listNodes; + this.nodeMap = nodeMap; + this.sniffNodesCount = sniffNodesCount; + threadPool = new TestThreadPool("transport-client-nodes-service-tests"); transport = new FailAndRetryMockTransport(random(), clusterName) { @Override @@ -79,7 +116,12 @@ public List getLocalAddresses() { @Override protected TestResponse newResponse() { - return new TestResponse(); + return new TestResponse(); + } + + @Override + protected ClusterState getMockClusterState(DiscoveryNode node) { + return ClusterState.builder(clusterName).nodes(TestIteration.this.nodeMap.get(node.getAddress())).build(); } }; transportService = new TransportService(settings, transport, threadPool, new TransportInterceptor() { @@ -101,14 +143,8 @@ public void sendRequest(DiscoveryNode node, String transportService.start(); transportService.acceptIncomingRequests(); transportClientNodesService = - new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {}); - this.nodesCount = randomIntBetween(1, 10); - for (int i = 0; i < nodesCount; i++) { - TransportAddress transportAddress = buildNewFakeTransportAddress(); - nodeAddresses.add(transportAddress); - transportClientNodesService.addTransportAddresses(transportAddress); - } - transport.endConnectMode(); + new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {}); + transportClientNodesService.addTransportAddresses(listNodeAddresses.toArray(new TransportAddress[0])); } private TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler handler, @@ -145,7 +181,7 @@ public String executor() { @Override public void close() { - + transport.endConnectMode(); transportService.stop(); transportClientNodesService.close(); try { @@ -160,6 +196,7 @@ public void testListenerFailures() throws InterruptedException { int iters = iterations(10, 100); for (int i = 0; i finalFailure = new AtomicReference<>(); @@ -230,7 +267,7 @@ public String executor() { } } - assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount)); assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); } @@ -241,17 +278,42 @@ public void testConnectedNodes() { int iters = iterations(10, 100); for (int i = 0; i ("removed address") { + @Override + public boolean matches(Object item) { + return item instanceof DiscoveryNode && ((DiscoveryNode)item).getAddress().equals(addressToRemove); + } + }))); + assertEquals(iteration.listNodesCount + iteration.sniffNodesCount - 1, service.connectedNodes().size()); + } + } + public static class TestRequest extends TransportRequest { } From 77cc3488c64ed2a91bb8e689a3012152a91adcd0 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Tue, 22 Nov 2016 16:15:06 -0800 Subject: [PATCH 2/2] Improve log messages for removing transport client address --- .../client/transport/TransportClientNodesService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 840ebe0fdc138..97df190597561 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -191,7 +191,7 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans if (!otherNode.getAddress().equals(transportAddress)) { listNodesBuilder.add(otherNode); } else { - logger.debug("removing address [{}] from list nodes", otherNode); + logger.debug("removing address [{}] from listed nodes", otherNode); } } listedNodes = Collections.unmodifiableList(listNodesBuilder); @@ -200,7 +200,7 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans if (!otherNode.getAddress().equals(transportAddress)) { nodesBuilder.add(otherNode); } else { - logger.debug("removing address [{}] from connected nodes", otherNode); + logger.debug("disconnecting from node with address [{}]", otherNode); transportService.disconnectFromNode(otherNode); } }