Skip to content

Commit

Permalink
Don't throw exceptions when eventloop has been closed
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Jan 15, 2016
1 parent f5c8ad4 commit 42a329e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
Expand Up @@ -29,7 +29,7 @@ public final void operationComplete(ChannelFuture future) throws Exception {
}
}

public abstract void onSuccess(Channel channel) throws Exception;
public abstract void onSuccess(Channel channel);

public abstract void onFailure(Channel channel, Throwable cause) throws Exception;
public abstract void onFailure(Channel channel, Throwable cause);
}
Expand Up @@ -78,7 +78,7 @@ private void writeRequest(Channel channel) {
}

@Override
public void onSuccess(Channel channel) throws Exception {
public void onSuccess(Channel channel) {

Request request = future.getTargetRequest();
Uri uri = request.getUri();
Expand Down Expand Up @@ -115,8 +115,8 @@ protected void onFailure(Throwable cause) throws Exception {
}

@Override
public void onFailure(Channel channel, Throwable cause) throws Exception {

public void onFailure(Channel channel, Throwable cause) {
//beware, channel can be null
abortChannelPreemption();

boolean canRetry = future.canRetry();
Expand Down
Expand Up @@ -20,6 +20,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.handler.AsyncHandlerExtensions;
Expand All @@ -33,13 +35,16 @@ public class NettyChannelConnector {
private final InetSocketAddress localAddress;
private final List<InetSocketAddress> remoteAddresses;
private final TimeoutsHolder timeoutsHolder;
private final AtomicBoolean closed;
private volatile int i = 0;

public NettyChannelConnector(InetAddress localAddress, List<InetSocketAddress> remoteAddresses, AsyncHandler<?> asyncHandler, TimeoutsHolder timeoutsHolder) {
public NettyChannelConnector(InetAddress localAddress, List<InetSocketAddress> remoteAddresses, AsyncHandler<?> asyncHandler, TimeoutsHolder timeoutsHolder,
AtomicBoolean closed) {
this.localAddress = localAddress != null ? new InetSocketAddress(localAddress, 0) : null;
this.remoteAddresses = remoteAddresses;
this.asyncHandlerExtensions = toAsyncHandlerExtensions(asyncHandler);
this.timeoutsHolder = timeoutsHolder;
this.closed = closed;
}

private boolean pickNextRemoteAddress() {
Expand All @@ -53,12 +58,24 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener<?> con
if (asyncHandlerExtensions != null)
asyncHandlerExtensions.onTcpConnectAttempt(remoteAddress);

final ChannelFuture future = localAddress != null ? bootstrap.connect(remoteAddress, localAddress) : bootstrap.connect(remoteAddress);
try {
connect0(bootstrap, connectListener, remoteAddress);
} catch (RejectedExecutionException e) {
if (closed.get()) {
connectListener.onFailure(null, e);
} else {
throw e;
}
}
}

private void connect0(Bootstrap bootstrap, final NettyConnectListener<?> connectListener, InetSocketAddress remoteAddress) {
final ChannelFuture future = bootstrap.connect(remoteAddress, localAddress);

future.addListener(new SimpleChannelFutureListener() {

@Override
public void onSuccess(Channel channel) throws Exception {
public void onSuccess(Channel channel) {
if (asyncHandlerExtensions != null) {
asyncHandlerExtensions.onTcpConnectSuccess(remoteAddress, future.channel());
}
Expand All @@ -67,7 +84,7 @@ public void onSuccess(Channel channel) throws Exception {
}

@Override
public void onFailure(Channel channel, Throwable t) throws Exception {
public void onFailure(Channel channel, Throwable t) {
if (asyncHandlerExtensions != null)
asyncHandlerExtensions.onTcpConnectFailure(remoteAddress, t);
boolean retry = pickNextRemoteAddress();
Expand Down
Expand Up @@ -277,7 +277,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
@Override
protected void onSuccess(List<InetSocketAddress> addresses) {
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey);
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder()).connect(bootstrap, connectListener);
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed).connect(bootstrap, connectListener);
}

@Override
Expand Down

0 comments on commit 42a329e

Please sign in to comment.