Skip to content

Commit

Permalink
Transport Client: fixed the node retry mechanism which could fail wit…
Browse files Browse the repository at this point in the history
…hout trying all the connected nodes

The RetryListener was notified twice for each single failure, which caused some additional retries, but more importantly was making the client reach the maximum number of retries (number of connected nodes) too quickly, meanwhile ongoing retries which could succeed were not completed yet.

The TransportService used to throw ConnectTransportException due to throwConnectException set to true, and also notify the listener of any exception received from a separate thread through the request holder.

Simplified exception handling by just removing the throwConnectException option from the TransportService, used only in the transport client. The transport client now relies solely on the request holder to notify of failures and eventually retry.

Closes #6829
  • Loading branch information
javanna committed Jul 28, 2014
1 parent eecbf8a commit fcf4d5a
Show file tree
Hide file tree
Showing 12 changed files with 829 additions and 111 deletions.
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -31,8 +29,6 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.action.support.PlainActionFuture.newFuture;

/**
* A generic proxy that will execute the given action against a specific node.
*/
Expand All @@ -52,13 +48,6 @@ public TransportActionNodeProxy(Settings settings, GenericAction<Request, Respon
this.transportOptions = action.transportOptions(settings);
}

public ActionFuture<Response> execute(DiscoveryNode node, Request request) throws ElasticsearchException {
PlainActionFuture<Response> future = newFuture();
request.listenerThreaded(false);
execute(node, request, future);
return future;
}

public void execute(DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
Expand Down
Expand Up @@ -111,9 +111,6 @@ public TransportClientNodesService(Settings settings, ClusterName clusterName, T
this.nodesSampler = new SimpleNodeSampler();
}
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());

// we want the transport service to throw connect exceptions, so we can retry
transportService.throwConnectException(true);
}

public ImmutableList<TransportAddress> transportAddresses() {
Expand Down Expand Up @@ -190,25 +187,6 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans
return this;
}

public <T> T execute(NodeCallback<T> callback) throws ElasticsearchException {
ImmutableList<DiscoveryNode> nodes = this.nodes;
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
for (int i = 0; i < nodes.size(); i++) {
DiscoveryNode node = nodes.get((index + i) % nodes.size());
try {
return callback.doWithNode(node);
} catch (ElasticsearchException e) {
if (e.unwrapCause() instanceof ConnectTransportException) {
logConnectTransportException((ConnectTransportException) e.unwrapCause());
} else {
throw e;
}
}
}
throw new NoNodeAvailableException("None of the configured nodes were available: " + nodes);
}

public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) throws ElasticsearchException {
ImmutableList<DiscoveryNode> nodes = this.nodes;
ensureNodesAreAvailable(nodes);
Expand All @@ -217,12 +195,9 @@ public <Response> void execute(NodeListenerCallback<Response> callback, ActionLi
DiscoveryNode node = nodes.get((index) % nodes.size());
try {
callback.doWithNode(node, retryListener);
} catch (ElasticsearchException e) {
if (e.unwrapCause() instanceof ConnectTransportException) {
retryListener.onFailure(e);
} else {
throw e;
}
} catch (Throwable t) {
//this exception can't come from the TransportService as it doesn't throw exception at all
listener.onFailure(t);
}
}

Expand Down Expand Up @@ -255,9 +230,9 @@ public void onFailure(Throwable e) {
} else {
try {
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
} catch (Throwable e1) {
// retry the next one...
onFailure(e1);
} catch(Throwable t) {
//this exception can't come from the TransportService as it doesn't throw exceptions at all
listener.onFailure(t);
}
}
} else {
Expand Down Expand Up @@ -299,14 +274,6 @@ private void ensureNodesAreAvailable(ImmutableList<DiscoveryNode> nodes) {
}
}

private void logConnectTransportException(ConnectTransportException connectTransportException) {
if (logger.isTraceEnabled()) {
logger.trace("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException, connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage());
} else {
logger.debug("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage());
}
}

abstract class NodeSampler {
public void sample() {
synchronized (mutex) {
Expand Down Expand Up @@ -507,13 +474,8 @@ public void handleException(TransportException e) {
}
}

public static interface NodeCallback<T> {

T doWithNode(DiscoveryNode node) throws ElasticsearchException;
}

public static interface NodeListenerCallback<Response> {

void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException;
void doWithNode(DiscoveryNode node, ActionListener<Response> listener);
}
}
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -32,17 +31,13 @@
*/
public class InternalTransportAdminClient extends AbstractComponent implements AdminClient {

private final TransportClientNodesService nodesService;

private final InternalTransportIndicesAdminClient indicesAdminClient;

private final InternalTransportClusterAdminClient clusterAdminClient;

@Inject
public InternalTransportAdminClient(Settings settings, TransportClientNodesService nodesService,
InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) {
public InternalTransportAdminClient(Settings settings, InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) {
super(settings);
this.nodesService = nodesService;
this.indicesAdminClient = indicesAdminClient;
this.clusterAdminClient = clusterAdminClient;
}
Expand Down
Expand Up @@ -20,8 +20,8 @@
package org.elasticsearch.client.transport.support;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
Expand Down Expand Up @@ -89,13 +89,9 @@ public AdminClient admin() {
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request) {
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Response>>() {
@Override
public ActionFuture<Response> doWithNode(DiscoveryNode node) throws ElasticsearchException {
return proxy.execute(node, request);
}
});
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
}

@SuppressWarnings("unchecked")
Expand All @@ -104,7 +100,7 @@ public <Request extends ActionRequest, Response extends ActionResponse, RequestB
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException {
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);
Expand Down
Expand Up @@ -20,9 +20,9 @@
package org.elasticsearch.client.transport.support;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.support.AbstractClusterAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
Expand Down Expand Up @@ -69,13 +69,9 @@ public ThreadPool threadPool() {
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request) {
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Response>>() {
@Override
public ActionFuture<Response> doWithNode(DiscoveryNode node) throws ElasticsearchException {
return proxy.execute(node, request);
}
});
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
}

@SuppressWarnings("unchecked")
Expand All @@ -84,7 +80,7 @@ public <Request extends ActionRequest, Response extends ActionResponse, RequestB
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException {
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);
Expand Down
Expand Up @@ -20,9 +20,9 @@
package org.elasticsearch.client.transport.support;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.indices.IndicesAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.support.AbstractIndicesAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
Expand Down Expand Up @@ -69,13 +69,9 @@ public ThreadPool threadPool() {
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request) {
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Response>>() {
@Override
public ActionFuture<Response> doWithNode(DiscoveryNode node) throws ElasticsearchException {
return proxy.execute(node, request);
}
});
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
}

@SuppressWarnings("unchecked")
Expand All @@ -84,7 +80,7 @@ public <Request extends ActionRequest, Response extends ActionResponse, RequestB
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException {
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);
Expand Down
22 changes: 2 additions & 20 deletions src/main/java/org/elasticsearch/transport/TransportService.java
Expand Up @@ -69,7 +69,6 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
}
});

private boolean throwConnectException = false;
private final TransportService.Adapter adapter = new Adapter();

public TransportService(Transport transport, ThreadPool threadPool) {
Expand Down Expand Up @@ -166,17 +165,6 @@ public void removeConnectionListener(TransportConnectionListener listener) {
connectionListeners.remove(listener);
}

/**
* Set to <tt>true</tt> to indicate that a {@link ConnectTransportException} should be thrown when
* sending a message (otherwise, it will be passed to the response handler). Defaults to <tt>false</tt>.
* <p/>
* <p>This is useful when logic based on connect failure is needed without having to wrap the handler,
* for example, in case of retries across several nodes.
*/
public void throwConnectException(boolean throwConnectException) {
this.throwConnectException = throwConnectException;
}

public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
TransportResponseHandler<T> handler) throws TransportException {
return submitRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
Expand All @@ -190,12 +178,12 @@ public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryN
}

public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportResponseHandler<T> handler) throws TransportException {
final TransportResponseHandler<T> handler) {
sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
}

public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
final TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (node == null) {
throw new ElasticsearchIllegalStateException("can't send request to a null node");
}
Expand Down Expand Up @@ -229,12 +217,6 @@ public void run() {
}
});
}

if (throwConnectException) {
if (e instanceof ConnectTransportException) {
throw (ConnectTransportException) e;
}
}
}
}

Expand Down

0 comments on commit fcf4d5a

Please sign in to comment.