Skip to content

Commit

Permalink
Java API TransportClient can fail on remote node shutdown instead of …
Browse files Browse the repository at this point in the history
…retrying the next connected node under heavy load, closes #1229.
  • Loading branch information
kimchy committed Aug 11, 2011
1 parent 0d75547 commit fc6e0dd
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 156 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.transport;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
Expand Down Expand Up @@ -143,6 +144,10 @@ public <T> T execute(NodeCallback<T> callback) throws ElasticSearchException {
throw new NoNodeAvailableException();
}
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
for (int i = 0; i < nodes.size(); i++) {
DiscoveryNode node = nodes.get((index + i) % nodes.size());
try {
Expand All @@ -154,6 +159,62 @@ public <T> T execute(NodeCallback<T> callback) throws ElasticSearchException {
throw new NoNodeAvailableException();
}

public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) throws ElasticSearchException {
ImmutableList<DiscoveryNode> nodes = this.nodes;
if (nodes.isEmpty()) {
throw new NoNodeAvailableException();
}
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
RetryListener<Response> retryListener = new RetryListener<Response>(callback, listener, nodes, index);
try {
callback.doWithNode(nodes.get((index) % nodes.size()), retryListener);
} catch (ConnectTransportException e) {
retryListener.onFailure(e);
}
}

public static class RetryListener<Response> implements ActionListener<Response> {
private final NodeListenerCallback<Response> callback;
private final ActionListener<Response> listener;
private final ImmutableList<DiscoveryNode> nodes;
private final int index;

private volatile int i;

public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes, int index) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
this.index = index;
}

@Override public void onResponse(Response response) {
listener.onResponse(response);
}

@Override public void onFailure(Throwable e) {
if (e instanceof ConnectTransportException) {
int i = ++this.i;
if (i == nodes.size()) {
listener.onFailure(new NoNodeAvailableException());
} else {
try {
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
} catch (Exception e1) {
// retry the next one...
onFailure(e);
}
}
} else {
listener.onFailure(e);
}
}
}

public void close() {
closed = true;
nodesSamplerFuture.cancel(true);
Expand Down Expand Up @@ -292,4 +353,9 @@ public static interface NodeCallback<T> {

T doWithNode(DiscoveryNode node) throws ElasticSearchException;
}

public static interface NodeListenerCallback<Response> {

void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticSearchException;
}
}
Expand Up @@ -139,12 +139,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<IndexResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<IndexResponse> listener) throws ElasticSearchException {
indexAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<DeleteResponse> delete(final DeleteRequest request) {
Expand All @@ -156,12 +155,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void delete(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<DeleteResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<DeleteResponse> listener) throws ElasticSearchException {
deleteAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<BulkResponse> bulk(final BulkRequest request) {
Expand All @@ -173,12 +171,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void bulk(final BulkRequest request, final ActionListener<BulkResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<BulkResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<BulkResponse> listener) throws ElasticSearchException {
bulkAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(final DeleteByQueryRequest request) {
Expand All @@ -190,12 +187,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void deleteByQuery(final DeleteByQueryRequest request, final ActionListener<DeleteByQueryResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<DeleteByQueryResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<DeleteByQueryResponse> listener) throws ElasticSearchException {
deleteByQueryAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<GetResponse> get(final GetRequest request) {
Expand All @@ -207,12 +203,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void get(final GetRequest request, final ActionListener<GetResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<GetResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<GetResponse> listener) throws ElasticSearchException {
getAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<MultiGetResponse> multiGet(final MultiGetRequest request) {
Expand All @@ -224,12 +219,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void multiGet(final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<MultiGetResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<MultiGetResponse> listener) throws ElasticSearchException {
multiGetAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<CountResponse> count(final CountRequest request) {
Expand All @@ -241,12 +235,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void count(final CountRequest request, final ActionListener<CountResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<CountResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<CountResponse> listener) throws ElasticSearchException {
countAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<SearchResponse> search(final SearchRequest request) {
Expand All @@ -258,12 +251,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<SearchResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<SearchResponse> listener) throws ElasticSearchException {
searchAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<SearchResponse> searchScroll(final SearchScrollRequest request) {
Expand All @@ -275,12 +267,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void searchScroll(final SearchScrollRequest request, final ActionListener<SearchResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<SearchResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<SearchResponse> listener) throws ElasticSearchException {
searchScrollAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<SearchResponse> moreLikeThis(final MoreLikeThisRequest request) {
Expand All @@ -292,12 +283,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void moreLikeThis(final MoreLikeThisRequest request, final ActionListener<SearchResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<SearchResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<SearchResponse> listener) throws ElasticSearchException {
moreLikeThisAction.execute(node, request, listener);
return null;
}
});
}, listener);
}

@Override public ActionFuture<PercolateResponse> percolate(final PercolateRequest request) {
Expand All @@ -309,11 +299,10 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}

@Override public void percolate(final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<PercolateResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<PercolateResponse> listener) throws ElasticSearchException {
percolateAction.execute(node, request, listener);
return null;
}
});
}, listener);
}
}

0 comments on commit fc6e0dd

Please sign in to comment.