Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport client: Fix remove address to actually work #21743

Merged
merged 2 commits into from
Nov 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,25 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans
if (closed) {
throw new IllegalStateException("transport client is closed, can't remove an address");
}
List<DiscoveryNode> builder = new ArrayList<>();
List<DiscoveryNode> listNodesBuilder = new ArrayList<>();
for (DiscoveryNode otherNode : listedNodes) {
if (!otherNode.getAddress().equals(transportAddress)) {
builder.add(otherNode);
listNodesBuilder.add(otherNode);
} else {
logger.debug("removing address [{}]", otherNode);
logger.debug("removing address [{}] from listed nodes", otherNode);
}
}
listedNodes = Collections.unmodifiableList(builder);
listedNodes = Collections.unmodifiableList(listNodesBuilder);
List<DiscoveryNode> nodesBuilder = new ArrayList<>();
for (DiscoveryNode otherNode : nodes) {
if (!otherNode.getAddress().equals(transportAddress)) {
nodesBuilder.add(otherNode);
} else {
logger.debug("disconnecting from node with address [{}]", otherNode);
transportService.disconnectFromNode(otherNode);
}
}
nodes = Collections.unmodifiableList(nodesBuilder);
nodesSampler.sample();
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ protected void doRun() throws Exception {
holderToNotify.handler().handleException(sendRequestException);
}
});
} else {
logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this debug statement is so that we do not completely swallow an exception where the holder has already been removed. This was causing an NPE I had while developing my mock code to be hidden. @jasontedor can explain more on why this is an "ok" case normally...

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.elasticsearch.client.transport;

import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -64,16 +69,26 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
this.clusterName = clusterName;
}

protected abstract ClusterState getMockClusterState(DiscoveryNode node);

@Override
@SuppressWarnings("unchecked")
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {

//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY),
node));
if (TransportLivenessAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY),
node));
} else if (ClusterStateAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
ClusterState clusterState = getMockClusterState(node);
transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState));
} else {
throw new UnsupportedOperationException("Mock transport does not understand action " + action);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,44 @@

package org.elasticsearch.client.transport;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.CustomMatcher;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand All @@ -62,14 +69,44 @@ private static class TestIteration implements Closeable {
private final FailAndRetryMockTransport<TestResponse> transport;
private final TransportService transportService;
private final TransportClientNodesService transportClientNodesService;
private final int nodesCount;
private final int listNodesCount;
private final int sniffNodesCount;
private TransportAddress livenessAddress = buildNewFakeTransportAddress();
public Set<TransportAddress> nodeAddresses = new HashSet<>();
final List<TransportAddress> listNodeAddresses;
// map for each address of the nodes a cluster state request should respond with
final Map<TransportAddress, DiscoveryNodes> nodeMap;


TestIteration() {
Settings settings = Settings.builder().put("cluster.name", "test").build();
TestIteration(Object... extraSettings) {
Settings settings = Settings.builder().put(extraSettings).put("cluster.name", "test").build();
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
List<TransportAddress> listNodes = new ArrayList<>();
Map<TransportAddress, DiscoveryNodes> nodeMap = new HashMap<>();
this.listNodesCount = randomIntBetween(1, 10);
int sniffNodesCount = 0;
for (int i = 0; i < listNodesCount; i++) {
TransportAddress transportAddress = buildNewFakeTransportAddress();
listNodes.add(transportAddress);
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
discoNodes.add(new DiscoveryNode("#list-node#-" + transportAddress, transportAddress, Version.CURRENT));

if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings)) {
final int numSniffNodes = randomIntBetween(0, 3);
for (int j = 0; j < numSniffNodes; ++j) {
TransportAddress sniffAddress = buildNewFakeTransportAddress();
DiscoveryNode sniffNode = new DiscoveryNode("#sniff-node#-" + sniffAddress, sniffAddress, Version.CURRENT);
discoNodes.add(sniffNode);
// also allow sniffing of the sniff node itself
nodeMap.put(sniffAddress, DiscoveryNodes.builder().add(sniffNode).build());
++sniffNodesCount;
}
}
nodeMap.put(transportAddress, discoNodes.build());
}
listNodeAddresses = listNodes;
this.nodeMap = nodeMap;
this.sniffNodesCount = sniffNodesCount;

threadPool = new TestThreadPool("transport-client-nodes-service-tests");
transport = new FailAndRetryMockTransport<TestResponse>(random(), clusterName) {
@Override
Expand All @@ -79,7 +116,12 @@ public List<String> getLocalAddresses() {

@Override
protected TestResponse newResponse() {
return new TestResponse();
return new TestResponse();
}

@Override
protected ClusterState getMockClusterState(DiscoveryNode node) {
return ClusterState.builder(clusterName).nodes(TestIteration.this.nodeMap.get(node.getAddress())).build();
}
};
transportService = new TransportService(settings, transport, threadPool, new TransportInterceptor() {
Expand All @@ -101,14 +143,8 @@ public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService =
new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {});
this.nodesCount = randomIntBetween(1, 10);
for (int i = 0; i < nodesCount; i++) {
TransportAddress transportAddress = buildNewFakeTransportAddress();
nodeAddresses.add(transportAddress);
transportClientNodesService.addTransportAddresses(transportAddress);
}
transport.endConnectMode();
new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {});
transportClientNodesService.addTransportAddresses(listNodeAddresses.toArray(new TransportAddress[0]));
}

private <T extends TransportResponse> TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler<T> handler,
Expand Down Expand Up @@ -145,7 +181,7 @@ public String executor() {

@Override
public void close() {

transport.endConnectMode();
transportService.stop();
transportClientNodesService.close();
try {
Expand All @@ -160,6 +196,7 @@ public void testListenerFailures() throws InterruptedException {
int iters = iterations(10, 100);
for (int i = 0; i <iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
iteration.transport.endConnectMode(); // stop transport from responding early
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger finalFailures = new AtomicInteger();
final AtomicReference<Throwable> finalFailure = new AtomicReference<>();
Expand Down Expand Up @@ -230,7 +267,7 @@ public String executor() {
}
}

assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount));
assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() +
iteration.transport.failures() + iteration.transport.successes()));
}
Expand All @@ -241,17 +278,42 @@ public void testConnectedNodes() {
int iters = iterations(10, 100);
for (int i = 0; i <iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
assertThat(iteration.transportClientNodesService.connectedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transportClientNodesService.connectedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount));
for (DiscoveryNode discoveryNode : iteration.transportClientNodesService.connectedNodes()) {
assertThat(discoveryNode.getHostName(), startsWith("liveness-"));
assertThat(discoveryNode.getHostAddress(), startsWith("liveness-"));
assertNotEquals(discoveryNode.getAddress(), iteration.livenessAddress);
assertThat(iteration.nodeAddresses, hasItem(discoveryNode.getAddress()));
assertThat(iteration.listNodeAddresses, hasItem(discoveryNode.getAddress()));
}
}
}
}

public void testRemoveAddressSniff() {
checkRemoveAddress(true);
}

public void testRemoveAddressSimple() {
checkRemoveAddress(false);
}

private void checkRemoveAddress(boolean sniff) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tests.

Object[] extraSettings = {TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), sniff};
try(final TestIteration iteration = new TestIteration(extraSettings)) {
final TransportClientNodesService service = iteration.transportClientNodesService;
assertEquals(iteration.listNodesCount + iteration.sniffNodesCount, service.connectedNodes().size());
final TransportAddress addressToRemove = randomFrom(iteration.listNodeAddresses);
service.removeTransportAddress(addressToRemove);
assertThat(service.connectedNodes(), everyItem(not(new CustomMatcher<DiscoveryNode>("removed address") {
@Override
public boolean matches(Object item) {
return item instanceof DiscoveryNode && ((DiscoveryNode)item).getAddress().equals(addressToRemove);
}
})));
assertEquals(iteration.listNodesCount + iteration.sniffNodesCount - 1, service.connectedNodes().size());
}
}

public static class TestRequest extends TransportRequest {

}
Expand Down