Skip to content

Commit

Permalink
Java API: Improve TransportClient in sniff mode to be more lightweigh…
Browse files Browse the repository at this point in the history
…t on connections and API, closes elastic#1898.
  • Loading branch information
kimchy committed May 2, 2012
1 parent 8944456 commit 8e2311f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 30 deletions.
Expand Up @@ -36,7 +36,7 @@ public class ClusterStateResponse implements ActionResponse {

private ClusterState clusterState;

ClusterStateResponse() {
public ClusterStateResponse() {
}

ClusterStateResponse(ClusterName clusterName, ClusterState clusterState) {
Expand Down
Expand Up @@ -20,13 +20,15 @@
package org.elasticsearch.client.transport;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -40,8 +42,7 @@

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -307,39 +308,48 @@ public synchronized void sample() {

// the nodes we are going to ping include the core listed nodes that were added
// and the last round of discovered nodes
Map<TransportAddress, DiscoveryNode> nodesToPing = Maps.newHashMap();
Set<DiscoveryNode> nodesToPing = Sets.newHashSet();
for (DiscoveryNode node : listedNodes) {
nodesToPing.put(node.address(), node);
nodesToPing.add(node);
}
for (DiscoveryNode node : nodes) {
nodesToPing.put(node.address(), node);
nodesToPing.add(node);
}

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final CopyOnWriteArrayList<NodesInfoResponse> nodesInfoResponses = new CopyOnWriteArrayList<NodesInfoResponse>();
for (final DiscoveryNode listedNode : nodesToPing.values()) {
final LinkedTransferQueue<ClusterStateResponse> clusterStateResponses = new LinkedTransferQueue<ClusterStateResponse>();
for (final DiscoveryNode listedNode : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override
public void run() {
try {
if (!transportService.nodeConnected(listedNode)) {
try {
logger.trace("connecting to node [{}]", listedNode);
transportService.connectToNode(listedNode);

// if its one of hte actual nodes we will talk to, not to listed nodes, fully connect
if (nodes.contains(listedNode)) {
logger.trace("connecting to cluster node [{}]", listedNode);
transportService.connectToNode(listedNode);
} else {
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode);
}
} catch (Exception e) {
logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);
latch.countDown();
return;
}
}
transportService.sendRequest(listedNode, NodesInfoAction.NAME,
Requests.nodesInfoRequest("_all"),
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
Requests.clusterStateRequest()
.filterAll().filterNodes(false).local(true),
TransportRequestOptions.options().withHighType().withTimeout(pingTimeout),
new BaseTransportResponseHandler<NodesInfoResponse>() {
new BaseTransportResponseHandler<ClusterStateResponse>() {

@Override
public NodesInfoResponse newInstance() {
return new NodesInfoResponse();
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}

@Override
Expand All @@ -348,20 +358,20 @@ public String executor() {
}

@Override
public void handleResponse(NodesInfoResponse response) {
nodesInfoResponses.add(response);
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.add(response);
latch.countDown();
}

@Override
public void handleException(TransportException e) {
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
logger.info("failed to get local cluster state for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
latch.countDown();
}
});
} catch (Exception e) {
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
latch.countDown();
}
Expand All @@ -376,15 +386,12 @@ public void handleException(TransportException e) {
}

HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>();
for (NodesInfoResponse nodesInfoResponse : nodesInfoResponses) {
for (NodeInfo nodeInfo : nodesInfoResponse) {
if (!clusterName.equals(nodesInfoResponse.clusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", nodeInfo.node(), clusterName);
} else {
if (nodeInfo.node().dataNode()) { // only add data nodes to connect to
newNodes.add(nodeInfo.node());
}
}
for (ClusterStateResponse clusterStateResponse : clusterStateResponses) {
if (!clusterName.equals(clusterStateResponse.clusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", clusterStateResponse.state().nodes().localNode(), clusterName);
}
for (DiscoveryNode node : clusterStateResponse.state().nodes().dataNodes().values()) {
newNodes.add(node);
}
}
// now, make sure we are connected to all the updated nodes
Expand Down

0 comments on commit 8e2311f

Please sign in to comment.