Skip to content

Commit

Permalink
Close channel when chunking with 401 and 407
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Landelle committed Jan 8, 2015
1 parent b606e09 commit 2535007
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 90 deletions.
53 changes: 27 additions & 26 deletions api/src/test/java/org/asynchttpclient/async/BasicAuthTest.java
Expand Up @@ -126,25 +126,24 @@ private static class RedirectHandler extends AbstractHandler {
public void handle(String s, Request r, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {

LOGGER.info("request: " + request.getRequestURI());
if ("/uff".equals(request.getRequestURI())) {

if ("/uff".equals(request.getRequestURI())) {
LOGGER.info("redirect to /bla");
response.setStatus(302);
response.setContentLength(0);
response.setHeader("Location", "/bla");
response.getOutputStream().flush();
response.getOutputStream().close();

return;

} else {
LOGGER.info("got redirected" + request.getRequestURI());
response.setStatus(200);
response.addHeader("X-Auth", request.getHeader("Authorization"));
response.addHeader("X-Content-Length", String.valueOf(request.getContentLength()));
response.setStatus(200);
response.getOutputStream().write("content".getBytes(UTF_8));
response.getOutputStream().flush();
response.getOutputStream().close();
byte[] b = "content".getBytes(UTF_8);
response.setContentLength(b.length);
response.getOutputStream().write(b);
}
response.getOutputStream().flush();
response.getOutputStream().close();
}
}

Expand All @@ -154,25 +153,27 @@ public void handle(String s, Request r, HttpServletRequest request, HttpServletR

if (request.getHeader("X-401") != null) {
response.setStatus(401);
response.getOutputStream().flush();
response.getOutputStream().close();

return;
}
response.addHeader("X-Auth", request.getHeader("Authorization"));
response.addHeader("X-Content-Length", String.valueOf(request.getContentLength()));
response.setStatus(200);
response.setContentLength(0);

int size = 10 * 1024;
if (request.getContentLength() > 0) {
size = request.getContentLength();
}
byte[] bytes = new byte[size];
if (bytes.length > 0) {
int read = request.getInputStream().read(bytes);
if (read > 0) {
response.getOutputStream().write(bytes, 0, read);
} else {
response.addHeader("X-Auth", request.getHeader("Authorization"));
response.addHeader("X-Content-Length", String.valueOf(request.getContentLength()));
response.setStatus(200);

int size = 10 * 1024;
if (request.getContentLength() > 0) {
size = request.getContentLength();
}
byte[] bytes = new byte[size];
int contentLength = 0;
if (bytes.length > 0) {
int read = request.getInputStream().read(bytes);
if (read > 0) {
contentLength = read;
response.getOutputStream().write(bytes, 0, read);
}
}
response.setContentLength(contentLength);
}
response.getOutputStream().flush();
response.getOutputStream().close();
Expand Down
Expand Up @@ -50,6 +50,7 @@ private class Relative302Handler extends AbstractHandler {
public void handle(String s, Request r, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException {

String param;
httpResponse.setStatus(200);
httpResponse.setContentType(TEXT_HTML_CONTENT_TYPE_WITH_UTF_8_CHARSET);
Enumeration<?> e = httpRequest.getHeaderNames();
while (e.hasMoreElements()) {
Expand All @@ -58,12 +59,10 @@ public void handle(String s, Request r, HttpServletRequest httpRequest, HttpServ
if (param.startsWith("X-redirect") && !isSet.getAndSet(true)) {
httpResponse.addHeader("Location", httpRequest.getHeader(param));
httpResponse.setStatus(302);
httpResponse.getOutputStream().flush();
httpResponse.getOutputStream().close();
return;
break;
}
}
httpResponse.setStatus(200);
httpResponse.setContentLength(0);
httpResponse.getOutputStream().flush();
httpResponse.getOutputStream().close();
}
Expand Down
Expand Up @@ -451,7 +451,11 @@ public void call() {
}

public void drainChannelAndOffer(final Channel channel, final NettyResponseFuture<?> future) {
Channels.setAttribute(channel, newDrainCallback(future, channel, future.isKeepAlive(), getPartitionId(future)));
drainChannelAndOffer(channel, future, future.isKeepAlive(), getPartitionId(future));
}

public void drainChannelAndOffer(final Channel channel, final NettyResponseFuture<?> future, boolean keepAlive, String poolKey) {
Channels.setAttribute(channel, newDrainCallback(future, channel, keepAlive, poolKey));
}

public void flushPartition(String partitionId) {
Expand Down
Expand Up @@ -35,10 +35,8 @@
import org.asynchttpclient.ntlm.NTLMEngine;
import org.asynchttpclient.ntlm.NTLMEngineException;
import org.asynchttpclient.providers.netty.commons.handler.ConnectionStrategy;
import org.asynchttpclient.providers.netty3.Callback;
import org.asynchttpclient.providers.netty3.NettyAsyncHttpProviderConfig;
import org.asynchttpclient.providers.netty3.channel.ChannelManager;
import org.asynchttpclient.providers.netty3.channel.Channels;
import org.asynchttpclient.providers.netty3.future.NettyResponseFuture;
import org.asynchttpclient.providers.netty3.request.NettyRequestSender;
import org.asynchttpclient.providers.netty3.response.NettyResponseBodyPart;
Expand Down Expand Up @@ -257,22 +255,13 @@ private boolean exitAfterHandling401(//
final Request nextRequest = new RequestBuilder(future.getRequest()).setHeaders(request.getHeaders()).setRealm(newRealm).build();

logger.debug("Sending authentication to {}", request.getUri());
if (future.isKeepAlive()) {
if (HttpHeaders.isTransferEncodingChunked(response)) {
Channels.setAttribute(channel, new Callback(future) {
public void call() throws IOException {
requestSender.drainChannelAndExecuteNextRequest(channel, future, nextRequest);
}
});
} else {
future.setReuseChannel(true);
requestSender.sendNextRequest(nextRequest, future);
}
if (future.isKeepAlive() && !HttpHeaders.isTransferEncodingChunked(response) && !response.isChunked()) {
future.setReuseChannel(true);
} else {
channelManager.closeChannel(channel);
requestSender.sendNextRequest(nextRequest, future);
}

requestSender.sendNextRequest(nextRequest, future);
return true;
}
}
Expand Down Expand Up @@ -336,22 +325,15 @@ private boolean exitAfterHandling407(//

final Request nextRequest = new RequestBuilder(future.getRequest()).setHeaders(request.getHeaders()).setRealm(newRealm).build();

logger.debug("Sending authentication to {}", request.getUri());
if (future.isKeepAlive()) {
logger.debug("Sending proxy authentication to {}", request.getUri());
if (future.isKeepAlive() && !HttpHeaders.isTransferEncodingChunked(response) && !response.isChunked()) {
future.setConnectAllowed(true);
future.setReuseChannel(true);
if (HttpHeaders.isTransferEncodingChunked(response)) {
Channels.setAttribute(channel, new Callback(future) {
public void call() throws IOException {
requestSender.drainChannelAndExecuteNextRequest(channel, future, nextRequest);
}
});
} else {
requestSender.sendNextRequest(nextRequest, future);
}
} else {
channelManager.closeChannel(channel);
requestSender.sendNextRequest(nextRequest, future);
}

requestSender.sendNextRequest(nextRequest, future);
return true;
}
}
Expand Down
Expand Up @@ -40,7 +40,6 @@
import org.asynchttpclient.filter.FilterException;
import org.asynchttpclient.filter.IOExceptionFilter;
import org.asynchttpclient.listener.TransferCompletionHandler;
import org.asynchttpclient.providers.netty3.Callback;
import org.asynchttpclient.providers.netty3.NettyAsyncHttpProviderConfig;
import org.asynchttpclient.providers.netty3.channel.ChannelManager;
import org.asynchttpclient.providers.netty3.channel.Channels;
Expand Down Expand Up @@ -540,18 +539,4 @@ public void replayRequest(final NettyResponseFuture<?> future, FilterContext fc,
public boolean isClosed() {
return closed.get();
}

public final Callback newExecuteNextRequestCallback(final NettyResponseFuture<?> future, final Request nextRequest) {

return new Callback(future) {
@Override
public void call() throws IOException {
sendNextRequest(nextRequest, future);
}
};
}

public void drainChannelAndExecuteNextRequest(final Channel channel, final NettyResponseFuture<?> future, Request nextRequest) {
Channels.setAttribute(channel, newExecuteNextRequestCallback(future, nextRequest));
}
}
Expand Up @@ -50,6 +50,7 @@
import org.asynchttpclient.providers.netty.commons.channel.pool.ChannelPoolPartitionSelector;
import org.asynchttpclient.providers.netty4.Callback;
import org.asynchttpclient.providers.netty4.NettyAsyncHttpProviderConfig;
import org.asynchttpclient.providers.netty4.channel.Channels;
import org.asynchttpclient.providers.netty4.channel.pool.ChannelPool;
import org.asynchttpclient.providers.netty4.channel.pool.DefaultChannelPool;
import org.asynchttpclient.providers.netty4.channel.pool.NoopChannelPool;
Expand Down Expand Up @@ -425,7 +426,11 @@ public void call() {
}

public void drainChannelAndOffer(final Channel channel, final NettyResponseFuture<?> future) {
Channels.setAttribute(channel, newDrainCallback(future, channel, future.isKeepAlive(), getPartitionId(future)));
drainChannelAndOffer(channel, future, future.isKeepAlive(), getPartitionId(future));
}

public void drainChannelAndOffer(final Channel channel, final NettyResponseFuture<?> future, boolean keepAlive, String poolKey) {
Channels.setAttribute(channel, newDrainCallback(future, channel, keepAlive, poolKey));
}

public void flushPartition(String partitionId) {
Expand Down
Expand Up @@ -42,10 +42,8 @@
import org.asynchttpclient.ntlm.NTLMEngine;
import org.asynchttpclient.ntlm.NTLMEngineException;
import org.asynchttpclient.providers.netty.commons.handler.ConnectionStrategy;
import org.asynchttpclient.providers.netty4.Callback;
import org.asynchttpclient.providers.netty4.NettyAsyncHttpProviderConfig;
import org.asynchttpclient.providers.netty4.channel.ChannelManager;
import org.asynchttpclient.providers.netty4.channel.Channels;
import org.asynchttpclient.providers.netty4.future.NettyResponseFuture;
import org.asynchttpclient.providers.netty4.request.NettyRequestSender;
import org.asynchttpclient.providers.netty4.response.NettyResponseBodyPart;
Expand Down Expand Up @@ -253,17 +251,9 @@ private boolean exitAfterHandling401(//
final Request nextRequest = new RequestBuilder(future.getRequest()).setHeaders(request.getHeaders()).setRealm(newRealm).build();

logger.debug("Sending authentication to {}", request.getUri());
if (future.isKeepAlive()) {
if (future.isKeepAlive() && !HttpHeaders.isTransferEncodingChunked(response)) {
future.setReuseChannel(true);
if (HttpHeaders.isTransferEncodingChunked(response)) {
Channels.setAttribute(channel, new Callback(future) {
public void call() throws IOException {
requestSender.drainChannelAndExecuteNextRequest(channel, future, nextRequest);
}
});
} else {
requestSender.sendNextRequest(nextRequest, future);
}
requestSender.drainChannelAndExecuteNextRequest(channel, future, nextRequest);
} else {
channelManager.closeChannel(channel);
requestSender.sendNextRequest(nextRequest, future);
Expand Down Expand Up @@ -332,10 +322,18 @@ private boolean exitAfterHandling407(//
.build();
}

future.setReuseChannel(true);
future.setConnectAllowed(true);
Request nextRequest = new RequestBuilder(future.getRequest()).setHeaders(requestHeaders).setRealm(newRealm).build();
requestSender.sendNextRequest(nextRequest, future);
final Request nextRequest = new RequestBuilder(future.getRequest()).setHeaders(request.getHeaders()).setRealm(newRealm).build();

logger.debug("Sending proxy authentication to {}", request.getUri());
if (future.isKeepAlive() && !HttpHeaders.isTransferEncodingChunked(response)) {
future.setConnectAllowed(true);
future.setReuseChannel(true);
requestSender.drainChannelAndExecuteNextRequest(channel, future, nextRequest);
} else {
channelManager.closeChannel(channel);
requestSender.sendNextRequest(nextRequest, future);
}

return true;
}
}
Expand Down Expand Up @@ -439,7 +437,7 @@ public void handle(final Channel channel, final NettyResponseFuture<?> future, f
try {
if (e instanceof HttpResponse) {
HttpResponse response = (HttpResponse) e;
// we buffer the response until we get the LastHttpContent
// we buffer the response until we get the first HttpContent
future.setPendingResponse(response);
return;

Expand Down

0 comments on commit 2535007

Please sign in to comment.