Skip to content

Commit

Permalink
Invoke response handler on failure to send (#53631)
Browse files Browse the repository at this point in the history
Today it can happen that a transport message fails to send (for example,
because a transport interceptor rejects the request). In this case, the
response handler is never invoked, which can lead to necessary cleanups
not being performed. There are two ways to handle this. One is to expect
every callsite that sends a message to try/catch these exceptions and
handle them appropriately. The other is merely to invoke the response
handler to handle the exception, which is already equipped to handle
transport exceptions.
  • Loading branch information
jasontedor committed Mar 17, 2020
1 parent ab94fc0 commit 7ec7786
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -573,37 +572,57 @@ public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryN
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
}

public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, options, handler);
}

/**
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
*
* @param connection the connection to send the request on
* @param action the name of the action
* @param request the request
* @param options the options for this request
* @param handler the response handler
* @param <T> the type of the transport response
*/
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
try {
asyncSender.sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}

Expand All @@ -623,13 +642,15 @@ public final <T extends TransportResponse> void sendChildRequest(final Discovery
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendChildRequest(connection, action, request, parentTask, options, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendChildRequest(connection, action, request, parentTask, options, handler);
}

public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
Expand All @@ -643,16 +664,7 @@ public <T extends TransportResponse> void sendChildRequest(final Transport.Conne
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
request.setParentTask(localNode.getId(), parentTask.getId());
try {
sendRequest(connection, action, request, options, handler);
} catch (TaskCancelledException ex) {
// The parent task is already cancelled - just fail the request
handler.handleException(new TransportException(ex));
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}

sendRequest(connection, action, request, options, handler);
}

private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ public static MockNioTransport newMockTransport(Settings settings, Version versi

public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR);
}

public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders,
TransportInterceptor interceptor) {
return new MockTransportService(settings, transport, threadPool, interceptor,
boundAddress ->
new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.test.ESTestCase.getPortRange;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
Expand Down Expand Up @@ -186,7 +188,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
}

private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests, boolean doHandshake) {
Settings settings, boolean acceptRequests, boolean doHandshake,
TransportInterceptor interceptor) {
Settings updatedSettings = Settings.builder()
.put(TransportSettings.PORT.getKey(), getPortRange())
.put(settings)
Expand All @@ -197,14 +200,19 @@ private MockTransportService buildService(final String name, final Version versi
}
Transport transport = build(updatedSettings, version, clusterSettings, doHandshake);
MockTransportService service = MockTransportService.createNewService(updatedSettings, transport, version, threadPool,
clusterSettings, Collections.emptySet());
clusterSettings, Collections.emptySet(), interceptor);
service.start();
if (acceptRequests) {
service.acceptIncomingRequests();
}
return service;
}

private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests, boolean doHandshake) {
return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR);
}

protected MockTransportService buildService(final String name, final Version version, Settings settings) {
return buildService(name, version, null, settings);
}
Expand Down Expand Up @@ -2743,6 +2751,95 @@ public void onConnectionClosed(Transport.Connection connection) {
}
}

// test that the response handler is invoked on a failure to send
public void testFailToSend() throws InterruptedException {
final RuntimeException failToSendException;
if (randomBoolean()) {
failToSendException = new IllegalStateException("fail to send");
} else {
failToSendException = new TransportException("fail to send");
}
final TransportInterceptor interceptor = new TransportInterceptor() {
@Override
public AsyncSender interceptSender(final AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
if ("fail-to-send-action".equals(action)) {
throw failToSendException;
} else {
sender.sendRequest(connection, action, request, options, handler);
}
}
};
}
};
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) {
serviceC.start();
serviceC.acceptIncomingRequests();
final CountDownLatch latch = new CountDownLatch(1);
serviceC.connectToNode(
serviceA.getLocalDiscoNode(),
ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY),
new ActionListener<>() {
@Override
public void onResponse(final Void v) {
latch.countDown();
}

@Override
public void onFailure(final Exception e) {
fail(e.getMessage());
}
});
latch.await();
final AtomicReference<TransportException> te = new AtomicReference<>();
final Transport.Connection connection = serviceC.getConnection(nodeA);
serviceC.sendRequest(
connection,
"fail-to-send-action",
TransportRequest.Empty.INSTANCE,
TransportRequestOptions.EMPTY,
new TransportResponseHandler<TransportResponse>() {
@Override
public void handleResponse(final TransportResponse response) {
fail("handle response should not be invoked");
}

@Override
public void handleException(final TransportException exp) {
te.set(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public TransportResponse read(final StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
});
assertThat(te.get(), not(nullValue()));

if (failToSendException instanceof IllegalStateException) {
assertThat(te.get().getMessage(), equalTo("failure to send"));
assertThat(te.get().getCause(), instanceOf(IllegalStateException.class));
assertThat(te.get().getCause().getMessage(), equalTo("fail to send"));
} else {
assertThat(te.get().getMessage(), equalTo("fail to send"));
assertThat(te.get().getCause(), nullValue());
}
}

}

private void closeConnectionChannel(Transport.Connection connection) {
StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection;
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();
Expand Down

0 comments on commit 7ec7786

Please sign in to comment.