diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 6cbb9ee98b..234d03dfa6 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; @@ -277,7 +278,8 @@ private ListenableFuture sendRequestWithNewChannel(// @Override protected void onSuccess(List addresses) { NettyConnectListener connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey); - new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap, connectListener); + new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap, + connectListener); } @Override @@ -327,12 +329,21 @@ public void writeRequest(NettyResponseFuture future, Channel channel) { boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue() && httpRequest.getMethod() != HttpMethod.CONNECT && nettyRequest.getBody() != null; if (!future.isHeadersAlreadyWrittenOnContinue()) { - if (future.getAsyncHandler() instanceof AsyncHandlerExtensions) - AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onRequestSend(nettyRequest); - - ChannelProgressivePromise promise = channel.newProgressivePromise(); - ChannelFuture f = writeBody ? channel.write(httpRequest, promise) : channel.writeAndFlush(httpRequest, promise); - f.addListener(new ProgressListener(future.getAsyncHandler(), future, true, 0L)); + if (handler instanceof AsyncHandlerExtensions) { + AsyncHandlerExtensions.class.cast(handler).onRequestSend(nettyRequest); + } + + // if the request has a body, we want to track progress + if (writeBody) { + ChannelProgressivePromise promise = channel.newProgressivePromise(); + ChannelFuture f = channel.write(httpRequest, promise); + f.addListener(new WriteProgressListener(future, true, 0L)); + } else { + // we can just track write completion + ChannelPromise promise = channel.newPromise(); + ChannelFuture f = channel.writeAndFlush(httpRequest, promise); + f.addListener(new WriteCompleteListener(future)); + } } if (writeBody) @@ -388,7 +399,7 @@ public void handleUnexpectedClosedChannel(Channel channel, NettyResponseFuture { + + public WriteCompleteListener(NettyResponseFuture future) { + super(future, true); + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + operationComplete(future.channel(), future.cause()); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/request/ProgressListener.java b/client/src/main/java/org/asynchttpclient/netty/request/WriteListener.java old mode 100755 new mode 100644 similarity index 55% rename from client/src/main/java/org/asynchttpclient/netty/request/ProgressListener.java rename to client/src/main/java/org/asynchttpclient/netty/request/WriteListener.java index 3f5b9fe421..9ba2cefa67 --- a/client/src/main/java/org/asynchttpclient/netty/request/ProgressListener.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/WriteListener.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -14,12 +14,9 @@ package org.asynchttpclient.netty.request; import io.netty.channel.Channel; -import io.netty.channel.ChannelProgressiveFuture; -import io.netty.channel.ChannelProgressiveFutureListener; import java.nio.channels.ClosedChannelException; -import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.handler.ProgressAsyncHandler; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.channel.ChannelState; @@ -28,33 +25,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ProgressListener implements ChannelProgressiveFutureListener { +public abstract class WriteListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ProgressListener.class); + private static final Logger LOGGER = LoggerFactory.getLogger(WriteListener.class); + protected final NettyResponseFuture future; + protected final ProgressAsyncHandler progressAsyncHandler; + protected final boolean notifyHeaders; - private final AsyncHandler asyncHandler; - private final NettyResponseFuture future; - private final boolean notifyHeaders; - private final long expectedTotal; - private long lastProgress = 0L; - - public ProgressListener(AsyncHandler asyncHandler,// - NettyResponseFuture future,// - boolean notifyHeaders,// - long expectedTotal) { - this.asyncHandler = asyncHandler; + public WriteListener(NettyResponseFuture future, boolean notifyHeaders) { this.future = future; + this.progressAsyncHandler = future.getAsyncHandler() instanceof ProgressAsyncHandler ? (ProgressAsyncHandler) future.getAsyncHandler() : null; this.notifyHeaders = notifyHeaders; - this.expectedTotal = expectedTotal; } - private boolean abortOnThrowable(Throwable cause, Channel channel) { - + private boolean abortOnThrowable(Channel channel, Throwable cause) { if (cause != null && future.getChannelState() != ChannelState.NEW) { if (cause instanceof IllegalStateException || cause instanceof ClosedChannelException || StackTraceInspector.recoverOnReadOrWriteException(cause)) { LOGGER.debug(cause.getMessage(), cause); Channels.silentlyCloseChannel(channel); - + } else { future.abort(cause); } @@ -64,24 +53,23 @@ private boolean abortOnThrowable(Throwable cause, Channel channel) { return false; } - @Override - public void operationComplete(ChannelProgressiveFuture cf) { + protected void operationComplete(Channel channel, Throwable cause) { + future.touch(); + // The write operation failed. If the channel was cached, it means it got asynchronously closed. // Let's retry a second time. - if (!abortOnThrowable(cf.cause(), cf.channel())) { - - future.touch(); + if (abortOnThrowable(channel, cause)) { + return; + } + if (progressAsyncHandler != null) { /** - * We need to make sure we aren't in the middle of an authorization - * process before publishing events as we will re-publish again the - * same event after the authorization, causing unpredictable - * behavior. + * We need to make sure we aren't in the middle of an authorization process before publishing events as we will re-publish again the same event after the authorization, + * causing unpredictable behavior. */ boolean startPublishing = !future.getInAuth().get() && !future.getInProxyAuth().get(); - - if (startPublishing && asyncHandler instanceof ProgressAsyncHandler) { - ProgressAsyncHandler progressAsyncHandler = (ProgressAsyncHandler) asyncHandler; + if (startPublishing) { + if (notifyHeaders) { progressAsyncHandler.onHeadersWritten(); } else { @@ -90,16 +78,4 @@ public void operationComplete(ChannelProgressiveFuture cf) { } } } - - @Override - public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) { - future.touch(); - if (!notifyHeaders && asyncHandler instanceof ProgressAsyncHandler) { - long lastLastProgress = lastProgress; - lastProgress = progress; - if (total < 0) - total = expectedTotal; - ProgressAsyncHandler.class.cast(asyncHandler).onContentWriteProgress(progress - lastLastProgress, progress, total); - } - } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/WriteProgressListener.java b/client/src/main/java/org/asynchttpclient/netty/request/WriteProgressListener.java new file mode 100755 index 0000000000..6abe955e0e --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/request/WriteProgressListener.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2014 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.request; + +import io.netty.channel.ChannelProgressiveFuture; +import io.netty.channel.ChannelProgressiveFutureListener; + +import org.asynchttpclient.netty.NettyResponseFuture; + +public class WriteProgressListener extends WriteListener implements ChannelProgressiveFutureListener { + + private final long expectedTotal; + private long lastProgress = 0L; + + public WriteProgressListener(NettyResponseFuture future,// + boolean notifyHeaders,// + long expectedTotal) { + super(future, notifyHeaders); + this.expectedTotal = expectedTotal; + } + + @Override + public void operationComplete(ChannelProgressiveFuture cf) { + operationComplete(cf.channel(), cf.cause()); + } + + @Override + public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) { + future.touch(); + + if (progressAsyncHandler != null && !notifyHeaders) { + long lastLastProgress = lastProgress; + lastProgress = progress; + if (total < 0) { + total = expectedTotal; + } + progressAsyncHandler.onContentWriteProgress(progress - lastLastProgress, progress, total); + } + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java index 4f85296a5d..581d2b85a5 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java @@ -25,7 +25,7 @@ import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.channel.ChannelManager; -import org.asynchttpclient.netty.request.ProgressListener; +import org.asynchttpclient.netty.request.WriteProgressListener; import org.asynchttpclient.request.body.Body; import org.asynchttpclient.request.body.RandomAccessBody; import org.asynchttpclient.request.body.generator.BodyGenerator; @@ -75,19 +75,21 @@ public void write(final Channel channel, NettyResponseFuture future) throws I public void onContentAdded() { chunkedWriteHandler.resumeTransfer(); } + @Override - public void onError(Throwable t) {} + public void onError(Throwable t) { + } }); } } - ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise()); - writeFuture.addListener(new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) { + ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise()); + writeFuture.addListener(new WriteProgressListener(future, false, getContentLength()) { public void operationComplete(ChannelProgressiveFuture cf) { closeSilently(body); super.operationComplete(cf); } }); - channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise()); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java index f824aa65ad..79227d825c 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java @@ -26,7 +26,7 @@ import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.channel.ChannelManager; -import org.asynchttpclient.netty.request.ProgressListener; +import org.asynchttpclient.netty.request.WriteProgressListener; public class NettyFileBody implements NettyBody { @@ -78,7 +78,7 @@ public void write(Channel channel, NettyResponseFuture future) throws IOExcep : new DefaultFileRegion(fileChannel, offset, length); channel.write(message, channel.newProgressivePromise())// - .addListener(new ProgressListener(future.getAsyncHandler(), future, false, getContentLength())); - channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + .addListener(new WriteProgressListener(future, false, getContentLength())); + channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise()); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java index 97fea3881f..1e0019d3cd 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java @@ -16,7 +16,7 @@ import static org.asynchttpclient.util.MiscUtils.closeSilently; import org.asynchttpclient.netty.NettyResponseFuture; -import org.asynchttpclient.netty.request.ProgressListener; +import org.asynchttpclient.netty.request.WriteProgressListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,12 +68,12 @@ public void write(Channel channel, NettyResponseFuture future) throws IOExcep } channel.write(new ChunkedStream(is), channel.newProgressivePromise()).addListener( - new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) { + new WriteProgressListener(future, false, getContentLength()) { public void operationComplete(ChannelProgressiveFuture cf) { closeSilently(is); super.operationComplete(cf); } }); - channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise()); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/ws/NettyWebSocket.java b/client/src/main/java/org/asynchttpclient/netty/ws/NettyWebSocket.java index 1af5e2212b..e9639e845f 100755 --- a/client/src/main/java/org/asynchttpclient/netty/ws/NettyWebSocket.java +++ b/client/src/main/java/org/asynchttpclient/netty/ws/NettyWebSocket.java @@ -87,43 +87,43 @@ public SocketAddress getLocalAddress() { @Override public WebSocket sendMessage(byte[] message) { - channel.writeAndFlush(new BinaryWebSocketFrame(wrappedBuffer(message))); + channel.writeAndFlush(new BinaryWebSocketFrame(wrappedBuffer(message)), channel.voidPromise()); return this; } @Override public WebSocket stream(byte[] fragment, boolean last) { - channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment))); + channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment)), channel.voidPromise()); return this; } @Override public WebSocket stream(byte[] fragment, int offset, int len, boolean last) { - channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len))); + channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len)), channel.voidPromise()); return this; } @Override public WebSocket sendMessage(String message) { - channel.writeAndFlush(new TextWebSocketFrame(message)); + channel.writeAndFlush(new TextWebSocketFrame(message), channel.voidPromise()); return this; } @Override public WebSocket stream(String fragment, boolean last) { - channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment)); + channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment), channel.voidPromise()); return this; } @Override public WebSocket sendPing(byte[] payload) { - channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload))); + channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise()); return this; } @Override public WebSocket sendPong(byte[] payload) { - channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload))); + channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise()); return this; }