Skip to content

Commit

Permalink
when fixing elastic#1229, we should also handle a case where the node…
Browse files Browse the repository at this point in the history
… is closing when connected from another node
  • Loading branch information
kimchy committed Aug 11, 2011
1 parent bad82cf commit 68d3d22
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
Expand Up @@ -136,7 +136,7 @@ private void perform(@Nullable final Exception lastException) {
failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to get [{}]", failure, request);
logger.debug(shardIt.shardId() + ": Failed to execute [{}]", failure, request);
}
}
listener.onFailure(failure);
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.transport;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
Expand Down Expand Up @@ -152,8 +153,10 @@ public <T> T execute(NodeCallback<T> callback) throws ElasticSearchException {
DiscoveryNode node = nodes.get((index + i) % nodes.size());
try {
return callback.doWithNode(node);
} catch (ConnectTransportException e) {
// retry in this case
} catch (ElasticSearchException e) {
if (!(e.unwrapCause() instanceof ConnectTransportException)) {
throw e;
}
}
}
throw new NoNodeAvailableException();
Expand All @@ -172,8 +175,12 @@ public <Response> void execute(NodeListenerCallback<Response> callback, ActionLi
RetryListener<Response> retryListener = new RetryListener<Response>(callback, listener, nodes, index);
try {
callback.doWithNode(nodes.get((index) % nodes.size()), retryListener);
} catch (ConnectTransportException e) {
retryListener.onFailure(e);
} catch (ElasticSearchException e) {
if (e.unwrapCause() instanceof ConnectTransportException) {
retryListener.onFailure(e);
} else {
throw e;
}
}
}

Expand All @@ -197,7 +204,7 @@ public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Res
}

@Override public void onFailure(Throwable e) {
if (e instanceof ConnectTransportException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i;
if (i == nodes.size()) {
listener.onFailure(new NoNodeAvailableException());
Expand Down
Expand Up @@ -187,9 +187,15 @@ public String[] resolveNodes(String... nodesIds) {
Set<String> resolvedNodesIds = new HashSet<String>(nodesIds.length);
for (String nodeId : nodesIds) {
if (nodeId.equals("_local")) {
resolvedNodesIds.add(localNodeId());
String localNodeId = localNodeId();
if (localNodeId != null) {
resolvedNodesIds.add(localNodeId);
}
} else if (nodeId.equals("_master")) {
resolvedNodesIds.add(masterNodeId());
String masterNodeId = masterNodeId();
if (masterNodeId != null) {
resolvedNodesIds.add(masterNodeId);
}
} else if (nodeExists(nodeId)) {
resolvedNodesIds.add(nodeId);
} else {
Expand Down

0 comments on commit 68d3d22

Please sign in to comment.