Skip to content

Commit

Permalink
Don't use a ProgressivePromise when we write a request without a body ,
Browse files Browse the repository at this point in the history
close #1144
  • Loading branch information
slandelle committed Apr 18, 2016
1 parent 79ddf22 commit 214522c
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 71 deletions.
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
Expand Down Expand Up @@ -277,7 +278,8 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
@Override @Override
protected void onSuccess(List<InetSocketAddress> addresses) { protected void onSuccess(List<InetSocketAddress> addresses) {
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey); NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey);
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap, connectListener); new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap,
connectListener);
} }


@Override @Override
Expand Down Expand Up @@ -327,12 +329,21 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue() && httpRequest.getMethod() != HttpMethod.CONNECT && nettyRequest.getBody() != null; boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue() && httpRequest.getMethod() != HttpMethod.CONNECT && nettyRequest.getBody() != null;


if (!future.isHeadersAlreadyWrittenOnContinue()) { if (!future.isHeadersAlreadyWrittenOnContinue()) {
if (future.getAsyncHandler() instanceof AsyncHandlerExtensions) if (handler instanceof AsyncHandlerExtensions) {
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onRequestSend(nettyRequest); AsyncHandlerExtensions.class.cast(handler).onRequestSend(nettyRequest);

}
ChannelProgressivePromise promise = channel.newProgressivePromise();
ChannelFuture f = writeBody ? channel.write(httpRequest, promise) : channel.writeAndFlush(httpRequest, promise); // if the request has a body, we want to track progress
f.addListener(new ProgressListener(future.getAsyncHandler(), future, true, 0L)); if (writeBody) {
ChannelProgressivePromise promise = channel.newProgressivePromise();
ChannelFuture f = channel.write(httpRequest, promise);
f.addListener(new WriteProgressListener(future, true, 0L));
} else {
// we can just track write completion
ChannelPromise promise = channel.newPromise();
ChannelFuture f = channel.writeAndFlush(httpRequest, promise);
f.addListener(new WriteCompleteListener(future));
}
} }


if (writeBody) if (writeBody)
Expand Down Expand Up @@ -388,7 +399,7 @@ public void handleUnexpectedClosedChannel(Channel channel, NettyResponseFuture<?
} else if (retry(future)) { } else if (retry(future)) {
future.pendingException = null; future.pendingException = null;
} else { } else {
abort(channel, future, future.pendingException != null? future.pendingException : RemotelyClosedException.INSTANCE); abort(channel, future, future.pendingException != null ? future.pendingException : RemotelyClosedException.INSTANCE);
} }
} }


Expand Down
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.request;

import org.asynchttpclient.netty.NettyResponseFuture;

import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;

public class WriteCompleteListener extends WriteListener implements GenericFutureListener<ChannelFuture> {

public WriteCompleteListener(NettyResponseFuture<?> future) {
super(future, true);
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
operationComplete(future.channel(), future.cause());
}
}
66 changes: 21 additions & 45 deletions ...lient/netty/request/ProgressListener.java → ...tpclient/netty/request/WriteListener.java 100755 → 100644
@@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved. * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
* *
* This program is licensed to you under the Apache License Version 2.0, * This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0. * and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -14,12 +14,9 @@
package org.asynchttpclient.netty.request; package org.asynchttpclient.netty.request;


import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;


import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;


import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.handler.ProgressAsyncHandler; import org.asynchttpclient.handler.ProgressAsyncHandler;
import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.channel.ChannelState; import org.asynchttpclient.netty.channel.ChannelState;
Expand All @@ -28,33 +25,25 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


public class ProgressListener implements ChannelProgressiveFutureListener { public abstract class WriteListener {


private static final Logger LOGGER = LoggerFactory.getLogger(ProgressListener.class); private static final Logger LOGGER = LoggerFactory.getLogger(WriteListener.class);
protected final NettyResponseFuture<?> future;
protected final ProgressAsyncHandler<?> progressAsyncHandler;
protected final boolean notifyHeaders;


private final AsyncHandler<?> asyncHandler; public WriteListener(NettyResponseFuture<?> future, boolean notifyHeaders) {
private final NettyResponseFuture<?> future;
private final boolean notifyHeaders;
private final long expectedTotal;
private long lastProgress = 0L;

public ProgressListener(AsyncHandler<?> asyncHandler,//
NettyResponseFuture<?> future,//
boolean notifyHeaders,//
long expectedTotal) {
this.asyncHandler = asyncHandler;
this.future = future; this.future = future;
this.progressAsyncHandler = future.getAsyncHandler() instanceof ProgressAsyncHandler ? (ProgressAsyncHandler<?>) future.getAsyncHandler() : null;
this.notifyHeaders = notifyHeaders; this.notifyHeaders = notifyHeaders;
this.expectedTotal = expectedTotal;
} }


private boolean abortOnThrowable(Throwable cause, Channel channel) { private boolean abortOnThrowable(Channel channel, Throwable cause) {

if (cause != null && future.getChannelState() != ChannelState.NEW) { if (cause != null && future.getChannelState() != ChannelState.NEW) {
if (cause instanceof IllegalStateException || cause instanceof ClosedChannelException || StackTraceInspector.recoverOnReadOrWriteException(cause)) { if (cause instanceof IllegalStateException || cause instanceof ClosedChannelException || StackTraceInspector.recoverOnReadOrWriteException(cause)) {
LOGGER.debug(cause.getMessage(), cause); LOGGER.debug(cause.getMessage(), cause);
Channels.silentlyCloseChannel(channel); Channels.silentlyCloseChannel(channel);

} else { } else {
future.abort(cause); future.abort(cause);
} }
Expand All @@ -64,24 +53,23 @@ private boolean abortOnThrowable(Throwable cause, Channel channel) {
return false; return false;
} }


@Override protected void operationComplete(Channel channel, Throwable cause) {
public void operationComplete(ChannelProgressiveFuture cf) { future.touch();

// The write operation failed. If the channel was cached, it means it got asynchronously closed. // The write operation failed. If the channel was cached, it means it got asynchronously closed.
// Let's retry a second time. // Let's retry a second time.
if (!abortOnThrowable(cf.cause(), cf.channel())) { if (abortOnThrowable(channel, cause)) {

return;
future.touch(); }


if (progressAsyncHandler != null) {
/** /**
* We need to make sure we aren't in the middle of an authorization * We need to make sure we aren't in the middle of an authorization process before publishing events as we will re-publish again the same event after the authorization,
* process before publishing events as we will re-publish again the * causing unpredictable behavior.
* same event after the authorization, causing unpredictable
* behavior.
*/ */
boolean startPublishing = !future.getInAuth().get() && !future.getInProxyAuth().get(); boolean startPublishing = !future.getInAuth().get() && !future.getInProxyAuth().get();

if (startPublishing) {
if (startPublishing && asyncHandler instanceof ProgressAsyncHandler) {
ProgressAsyncHandler<?> progressAsyncHandler = (ProgressAsyncHandler<?>) asyncHandler;
if (notifyHeaders) { if (notifyHeaders) {
progressAsyncHandler.onHeadersWritten(); progressAsyncHandler.onHeadersWritten();
} else { } else {
Expand All @@ -90,16 +78,4 @@ public void operationComplete(ChannelProgressiveFuture cf) {
} }
} }
} }

@Override
public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) {
future.touch();
if (!notifyHeaders && asyncHandler instanceof ProgressAsyncHandler) {
long lastLastProgress = lastProgress;
lastProgress = progress;
if (total < 0)
total = expectedTotal;
ProgressAsyncHandler.class.cast(asyncHandler).onContentWriteProgress(progress - lastLastProgress, progress, total);
}
}
} }
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.request;

import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;

import org.asynchttpclient.netty.NettyResponseFuture;

public class WriteProgressListener extends WriteListener implements ChannelProgressiveFutureListener {

private final long expectedTotal;
private long lastProgress = 0L;

public WriteProgressListener(NettyResponseFuture<?> future,//
boolean notifyHeaders,//
long expectedTotal) {
super(future, notifyHeaders);
this.expectedTotal = expectedTotal;
}

@Override
public void operationComplete(ChannelProgressiveFuture cf) {
operationComplete(cf.channel(), cf.cause());
}

@Override
public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) {
future.touch();

if (progressAsyncHandler != null && !notifyHeaders) {
long lastLastProgress = lastProgress;
lastProgress = progress;
if (total < 0) {
total = expectedTotal;
}
progressAsyncHandler.onContentWriteProgress(progress - lastLastProgress, progress, total);
}
}
}
Expand Up @@ -25,7 +25,7 @@
import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.channel.ChannelManager;
import org.asynchttpclient.netty.request.ProgressListener; import org.asynchttpclient.netty.request.WriteProgressListener;
import org.asynchttpclient.request.body.Body; import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.RandomAccessBody; import org.asynchttpclient.request.body.RandomAccessBody;
import org.asynchttpclient.request.body.generator.BodyGenerator; import org.asynchttpclient.request.body.generator.BodyGenerator;
Expand Down Expand Up @@ -75,19 +75,21 @@ public void write(final Channel channel, NettyResponseFuture<?> future) throws I
public void onContentAdded() { public void onContentAdded() {
chunkedWriteHandler.resumeTransfer(); chunkedWriteHandler.resumeTransfer();
} }

@Override @Override
public void onError(Throwable t) {} public void onError(Throwable t) {
}
}); });
} }
} }
ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise());


writeFuture.addListener(new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) { ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise());
writeFuture.addListener(new WriteProgressListener(future, false, getContentLength()) {
public void operationComplete(ChannelProgressiveFuture cf) { public void operationComplete(ChannelProgressiveFuture cf) {
closeSilently(body); closeSilently(body);
super.operationComplete(cf); super.operationComplete(cf);
} }
}); });
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise());
} }
} }
Expand Up @@ -26,7 +26,7 @@
import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.channel.ChannelManager;
import org.asynchttpclient.netty.request.ProgressListener; import org.asynchttpclient.netty.request.WriteProgressListener;


public class NettyFileBody implements NettyBody { public class NettyFileBody implements NettyBody {


Expand Down Expand Up @@ -78,7 +78,7 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
: new DefaultFileRegion(fileChannel, offset, length); : new DefaultFileRegion(fileChannel, offset, length);


channel.write(message, channel.newProgressivePromise())// channel.write(message, channel.newProgressivePromise())//
.addListener(new ProgressListener(future.getAsyncHandler(), future, false, getContentLength())); .addListener(new WriteProgressListener(future, false, getContentLength()));
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise());
} }
} }
Expand Up @@ -16,7 +16,7 @@
import static org.asynchttpclient.util.MiscUtils.closeSilently; import static org.asynchttpclient.util.MiscUtils.closeSilently;


import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.request.ProgressListener; import org.asynchttpclient.netty.request.WriteProgressListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -68,12 +68,12 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
} }


channel.write(new ChunkedStream(is), channel.newProgressivePromise()).addListener( channel.write(new ChunkedStream(is), channel.newProgressivePromise()).addListener(
new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) { new WriteProgressListener(future, false, getContentLength()) {
public void operationComplete(ChannelProgressiveFuture cf) { public void operationComplete(ChannelProgressiveFuture cf) {
closeSilently(is); closeSilently(is);
super.operationComplete(cf); super.operationComplete(cf);
} }
}); });
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise());
} }
} }
Expand Up @@ -87,43 +87,43 @@ public SocketAddress getLocalAddress() {


@Override @Override
public WebSocket sendMessage(byte[] message) { public WebSocket sendMessage(byte[] message) {
channel.writeAndFlush(new BinaryWebSocketFrame(wrappedBuffer(message))); channel.writeAndFlush(new BinaryWebSocketFrame(wrappedBuffer(message)), channel.voidPromise());
return this; return this;
} }


@Override @Override
public WebSocket stream(byte[] fragment, boolean last) { public WebSocket stream(byte[] fragment, boolean last) {
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment))); channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment)), channel.voidPromise());
return this; return this;
} }


@Override @Override
public WebSocket stream(byte[] fragment, int offset, int len, boolean last) { public WebSocket stream(byte[] fragment, int offset, int len, boolean last) {
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len))); channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len)), channel.voidPromise());
return this; return this;
} }


@Override @Override
public WebSocket sendMessage(String message) { public WebSocket sendMessage(String message) {
channel.writeAndFlush(new TextWebSocketFrame(message)); channel.writeAndFlush(new TextWebSocketFrame(message), channel.voidPromise());
return this; return this;
} }


@Override @Override
public WebSocket stream(String fragment, boolean last) { public WebSocket stream(String fragment, boolean last) {
channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment)); channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment), channel.voidPromise());
return this; return this;
} }


@Override @Override
public WebSocket sendPing(byte[] payload) { public WebSocket sendPing(byte[] payload) {
channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload))); channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise());
return this; return this;
} }


@Override @Override
public WebSocket sendPong(byte[] payload) { public WebSocket sendPong(byte[] payload) {
channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload))); channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise());
return this; return this;
} }


Expand Down

0 comments on commit 214522c

Please sign in to comment.