Skip to content

Commit

Permalink
Make SimpleNodeSampler populate the list of connected nodes using the…
Browse files Browse the repository at this point in the history
… information returned from the cluster

This is to allow people to introspect things like data settings and attributes. Also makes it consistent with the sniff sampler.

Closes elastic#4162
  • Loading branch information
bleskes committed Nov 13, 2013
1 parent a09d4a1 commit b86b471
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 25 deletions.
Expand Up @@ -252,7 +252,7 @@ public void onResponse(Response response) {
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i;
if (i == nodes.size()) {
if (i >= nodes.size()) {
listener.onFailure(new NoNodeAvailableException());
} else {
try {
Expand Down Expand Up @@ -296,6 +296,28 @@ public void sample() {
}

protected abstract void doSample();

/**
* validates a set of potentially newly discovered nodes and returns an immutable
* list of the nodes that has passed.
*/
protected ImmutableList<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
try {
logger.trace("connecting to node [{}]", node);
transportService.connectToNode(node);
} catch (Throwable e) {
it.remove();
logger.debug("failed to connect to discovered node [" + node + "]", e);
}
}
}

return new ImmutableList.Builder<DiscoveryNode>().addAll(nodes).build();
}

}

class ScheduledNodeSampler implements Runnable {
Expand All @@ -317,17 +339,19 @@ class SimpleNodeSampler extends NodeSampler {
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>();
for (DiscoveryNode node : listedNodes) {
if (!transportService.nodeConnected(node)) {
for (DiscoveryNode listedNode : listedNodes) {
if (!transportService.nodeConnected(listedNode)) {
try {
transportService.connectToNode(node);
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode);
} catch (Throwable e) {
logger.debug("failed to connect to node [{}], removed from nodes list", e, node);
logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode);
continue;
}
}
try {
NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME,
NodesInfoResponse nodeInfo = transportService.submitRequest(listedNode, NodesInfoAction.NAME,
Requests.nodesInfoRequest("_local"),
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
new FutureTransportResponseHandler<NodesInfoResponse>() {
Expand All @@ -337,16 +361,26 @@ public NodesInfoResponse newInstance() {
}
}).txGet();
if (!ignoreClusterName && !clusterName.equals(nodeInfo.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", node, clusterName);
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
} else if (nodeInfo.getNodes().length != 0) {
// use discovered information but do keep the original transport address, so people can control which address
// is exactly used.

DiscoveryNode nodeWithInfo = nodeInfo.getNodes()[0].getNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), listedNode.address(), nodeWithInfo.attributes(), nodeWithInfo.version()));
} else {
newNodes.add(node);
// although we asked for one node, our target may not have completed initialization yet and doesn't have
// cluster nodes
logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode);
newNodes.add(listedNode);
}
} catch (Exception e) {
logger.info("failed to get node info for {}, disconnecting...", e, node);
transportService.disconnectFromNode(node);
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();

nodes = validateNewNodes(newNodes);
}
}

Expand Down Expand Up @@ -442,20 +476,8 @@ public void handleException(TransportException e) {
newNodes.add(node);
}
}
// now, make sure we are connected to all the updated nodes
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
try {
logger.trace("connecting to node [{}]", node);
transportService.connectToNode(node);
} catch (Throwable e) {
it.remove();
logger.debug("failed to connect to discovered node [" + node + "]", e);
}
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();

nodes = validateNewNodes(newNodes);
}
}

Expand Down
@@ -0,0 +1,39 @@
package org.elasticsearch.client.transport;
/*
* Licensed to ElasticSearch under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.hamcrest.Matchers;
import org.junit.Test;

@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0, transportClientRatio = 1.0)
public class TransportClientTests extends ElasticsearchIntegrationTest {

@Test
public void testPickingUpChangesInDiscoveryNode() {
String nodeName = cluster().startNode(ImmutableSettings.builder().put("node.data", false));

TransportClient client = (TransportClient) cluster().client(nodeName);
assertThat(client.connectedNodes().get(0).dataNode(), Matchers.equalTo(false));

}
}

0 comments on commit b86b471

Please sign in to comment.