Skip to content

Commit

Permalink
Fix stream suspending with FeedableBodyGenerator.
Browse files Browse the repository at this point in the history
netty expects null as outgoing buffer to suspend the stream.
see ChunkedWriteHandler#222

- Added unit test for feedable body generator
- Fix netty 4 issue with feedable body generator
- provide integration test
  • Loading branch information
Patrick Haun authored and bomgar committed Aug 15, 2015
1 parent 1bd2a25 commit a456436
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 33 deletions.
27 changes: 22 additions & 5 deletions api/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java 100644 → 100755
Expand Up @@ -32,6 +32,8 @@ public final class FeedableBodyGenerator implements BodyGenerator {
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<>();
private FeedListener listener;

private boolean writeChunkBoundaries = true;

@Override
public Body createBody() {
return new PushBody();
Expand All @@ -52,11 +54,15 @@ public void setListener(FeedListener listener) {
this.listener = listener;
}

public void setWriteChunkBoundaries(boolean writeChunkBoundaries) {
this.writeChunkBoundaries = writeChunkBoundaries;
}

private static enum PushBodyState {
ONGOING, CLOSING, FINISHED;
}

private final class PushBody implements Body {
public final class PushBody implements Body {

private PushBodyState state = PushBodyState.ONGOING;

Expand All @@ -83,19 +89,30 @@ public long read(final ByteBuffer buffer) throws IOException {
return -1;
}
}
if(nextPart.buffer.remaining() == 0) {
// skip empty buffers
// if we return 0 here it would suspend the stream - we don't want that
queue.remove();
if(nextPart.isLast) {
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
}
return read(buffer);
}
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
int size = Math.min(nextPart.buffer.remaining(), capacity);
if (size != 0) {
buffer.put(Integer.toHexString(size).getBytes(US_ASCII));
buffer.put(END_PADDING);
if(writeChunkBoundaries) {
buffer.put(Integer.toHexString(size).getBytes(US_ASCII));
buffer.put(END_PADDING);
}
for (int i = 0; i < size; i++) {
buffer.put(nextPart.buffer.get());
}
buffer.put(END_PADDING);
if(writeChunkBoundaries) buffer.put(END_PADDING);
}
if (!nextPart.buffer.hasRemaining()) {
if (nextPart.isLast) {
state = PushBodyState.CLOSING;
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
}
queue.remove();
}
Expand Down
105 changes: 78 additions & 27 deletions api/src/test/java/org/asynchttpclient/request/body/ChunkingTest.java 100644 → 100755
Expand Up @@ -20,14 +20,18 @@

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
import org.asynchttpclient.request.body.generator.InputStreamBodyGenerator;
import org.testng.annotations.Test;

Expand All @@ -40,28 +44,27 @@ abstract public class ChunkingTest extends AbstractBasicTest {
// So we can just test the returned data is the image,
// and doesn't contain the chunked delimeters.
@Test()
public void testBufferLargerThanFile() throws Throwable {
doTest(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE), 400000));
public void testBufferLargerThanFileWithStreamBodyGenerator() throws Throwable {
doTestWithInputStreamBodyGenerator(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE), 400000));
}

@Test()
public void testBufferSmallThanFile() throws Throwable {
doTest(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE)));
public void testBufferSmallThanFileWithStreamBodyGenerator() throws Throwable {
doTestWithInputStreamBodyGenerator(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE)));
}

@Test()
public void testDirectFile() throws Throwable {
doTest(new FileInputStream(LARGE_IMAGE_FILE));
public void testDirectFileWithStreamBodyGenerator() throws Throwable {
doTestWithInputStreamBodyGenerator(new FileInputStream(LARGE_IMAGE_FILE));
}

public void doTest(InputStream is) throws Throwable {
AsyncHttpClientConfig.Builder bc = new AsyncHttpClientConfig.Builder()//
.setAllowPoolingConnections(true)//
.setMaxConnectionsPerHost(1)//
.setMaxConnections(1)//
.setConnectTimeout(1000)//
.setRequestTimeout(1000)
.setFollowRedirect(true);
@Test()
public void testDirectFileWithFeedableBodyGenerator() throws Throwable {
doTestWithFeedableBodyGenerator(new FileInputStream(LARGE_IMAGE_FILE));
}

public void doTestWithInputStreamBodyGenerator(InputStream is) throws Throwable {
AsyncHttpClientConfig.Builder bc = httpClientBuilder();

try (AsyncHttpClient c = getAsyncHttpClient(bc.build())) {

Expand All @@ -71,20 +74,68 @@ public void doTest(InputStream is) throws Throwable {

Request r = builder.build();

Response response = c.executeRequest(r).get();
if (500 == response.getStatusCode()) {
StringBuilder sb = new StringBuilder();
sb.append("==============\n");
sb.append("500 response from call\n");
sb.append("Headers:" + response.getHeaders() + "\n");
sb.append("==============\n");
logger.debug(sb.toString());
assertEquals(response.getStatusCode(), 500, "Should have 500 status code");
assertTrue(response.getHeader("X-Exception").contains("invalid.chunk.length"), "Should have failed due to chunking");
fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + response.getHeader("X-Exception"));
} else {
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
final ListenableFuture<Response> responseFuture = c.executeRequest(r);
waitForAndAssertResponse(responseFuture);
}
}

public void doTestWithFeedableBodyGenerator(InputStream is) throws Throwable {
AsyncHttpClientConfig.Builder bc = httpClientBuilder();

try (AsyncHttpClient c = getAsyncHttpClient(bc.build())) {

RequestBuilder builder = new RequestBuilder("POST");
builder.setUrl(getTargetUrl());
final FeedableBodyGenerator feedableBodyGenerator = new FeedableBodyGenerator();
builder.setBody(feedableBodyGenerator);

Request r = builder.build();

final ListenableFuture<Response> responseFuture = c.executeRequest(r);

feed(feedableBodyGenerator, is);

waitForAndAssertResponse(responseFuture);
}
}

private void feed(FeedableBodyGenerator feedableBodyGenerator, InputStream is) throws IOException {
try(InputStream inputStream = is) {
byte[] buffer = new byte[512];
for(int i =0; (i = inputStream.read(buffer)) > -1;) {
byte[] chunk = new byte[i];
System.arraycopy(buffer, 0, chunk, 0, i);
feedableBodyGenerator.feed(ByteBuffer.wrap(chunk), false);
}
}
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);

}

private AsyncHttpClientConfig.Builder httpClientBuilder() {
return new AsyncHttpClientConfig.Builder()//
.setAllowPoolingConnections(true)//
.setMaxConnectionsPerHost(1)//
.setMaxConnections(1)//
.setConnectTimeout(1000)//
.setRequestTimeout(1000)
.setFollowRedirect(true);
}

private void waitForAndAssertResponse(ListenableFuture<Response> responseFuture) throws InterruptedException, java.util.concurrent.ExecutionException, IOException {
Response response = responseFuture.get();
if (500 == response.getStatusCode()) {
StringBuilder sb = new StringBuilder();
sb.append("==============\n");
sb.append("500 response from call\n");
sb.append("Headers:" + response.getHeaders() + "\n");
sb.append("==============\n");
logger.debug(sb.toString());
assertEquals(response.getStatusCode(), 500, "Should have 500 status code");
assertTrue(response.getHeader("X-Exception").contains("invalid.chunk.length"), "Should have failed due to chunking");
fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + response.getHeader("X-Exception"));
} else {
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
}
}
}
@@ -0,0 +1,87 @@
package org.asynchttpclient.request.body.generator;

import org.asynchttpclient.request.body.Body;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import static org.testng.Assert.*;

public class FeedableBodyGeneratorTest {

private FeedableBodyGenerator feedableBodyGenerator;
private TestFeedListener listener;

@BeforeMethod
public void setUp() throws Exception {
feedableBodyGenerator = new FeedableBodyGenerator();
listener = new TestFeedListener();
feedableBodyGenerator.setListener(listener);
}

@Test(groups = "standalone")
public void feedNotifiesListener() throws Exception {
feedableBodyGenerator.feed(ByteBuffer.allocate(0), false);
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
assertEquals(listener.getCalls(), 2);
}

@Test(groups = "standalone")
public void readingBytesReturnsFedContentWithEmptyLastBuffer() throws Exception {
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
feedableBodyGenerator.feed(ByteBuffer.wrap(content), false);
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1);

}

@Test(groups = "standalone")
public void readingBytesReturnsFedContentWithFilledLastBuffer() throws Exception {
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1);

}

@Test(groups = "standalone")
public void readingBytesReturnsFedContentWithoutChunkBoundariesWhenDisabled() throws Exception {
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
feedableBodyGenerator.setWriteChunkBoundaries(false);
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "Test123".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1);

}

private byte[] readFromBody(Body body) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
long read = body.read(byteBuffer);
byteBuffer.flip();
byte[] readBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(readBytes);
return readBytes;
}

private static class TestFeedListener implements FeedableBodyGenerator.FeedListener {

private int calls;
@Override
public void onContentAdded() {
calls++;
}

public int getCalls() {
return calls;
}
}
}
Expand Up @@ -16,6 +16,7 @@
import java.nio.ByteBuffer;

import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.stream.ChunkedInput;

Expand Down Expand Up @@ -57,6 +58,9 @@ public Object nextChunk() throws Exception {
if (r < 0L) {
endOfInput = true;
return null;
} else if(r == 0 && body instanceof FeedableBodyGenerator.PushBody) {
//this will suspend the stream in ChunkedWriteHandler
return null;
} else {
endOfInput = r == contentLength || r < chunkSize && contentLength > 0;
buffer.flip();
Expand Down
Expand Up @@ -19,6 +19,7 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;

import java.nio.ByteBuffer;

Expand Down Expand Up @@ -57,6 +58,9 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
if (r < 0L) {
endOfInput = true;
return null;
} else if(r == 0 && body instanceof FeedableBodyGenerator.PushBody) {
//this will suspend the stream in ChunkedWriteHandler
return null;
} else {
endOfInput = r == contentLength || r < chunkSize && contentLength > 0;
buffer.flip();
Expand Down
Expand Up @@ -68,7 +68,9 @@ public void write(final Channel channel, NettyResponseFuture<?> future) throws I

BodyGenerator bg = future.getRequest().getBodyGenerator();
if (bg instanceof FeedableBodyGenerator) {
FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() {
final FeedableBodyGenerator feedableBodyGenerator = FeedableBodyGenerator.class.cast(bg);
feedableBodyGenerator.setWriteChunkBoundaries(false);
feedableBodyGenerator.setListener(new FeedListener() {
@Override
public void onContentAdded() {
channel.pipeline().get(ChunkedWriteHandler.class).resumeTransfer();
Expand Down

0 comments on commit a456436

Please sign in to comment.