Skip to content

Commit

Permalink
default behavior should target Netty 4, see #948
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Aug 15, 2015
1 parent 77bbdfd commit f5f9b4b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 77 deletions.
Expand Up @@ -32,7 +32,8 @@ public final class FeedableBodyGenerator implements BodyGenerator {
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<>(); private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<>();
private FeedListener listener; private FeedListener listener;


private boolean writeChunkBoundaries = true; // must be set to true when using Netty 3 where native chunking is broken
private boolean writeChunkBoundaries = false;


@Override @Override
public Body createBody() { public Body createBody() {
Expand All @@ -54,8 +55,8 @@ public void setListener(FeedListener listener) {
this.listener = listener; this.listener = listener;
} }


public void setWriteChunkBoundaries(boolean writeChunkBoundaries) { public void writeChunkBoundaries() {
this.writeChunkBoundaries = writeChunkBoundaries; this.writeChunkBoundaries = true;
} }


private static enum PushBodyState { private static enum PushBodyState {
Expand Down Expand Up @@ -89,7 +90,7 @@ public long read(final ByteBuffer buffer) throws IOException {
return -1; return -1;
} }
} }
if(nextPart.buffer.remaining() == 0) { if (nextPart.buffer.remaining() == 0) {
// skip empty buffers // skip empty buffers
// if we return 0 here it would suspend the stream - we don't want that // if we return 0 here it would suspend the stream - we don't want that
queue.remove(); queue.remove();
Expand Down
@@ -1,3 +1,16 @@
/*
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.request.body.generator; package org.asynchttpclient.request.body.generator;


import org.asynchttpclient.request.body.Body; import org.asynchttpclient.request.body.Body;
Expand All @@ -12,76 +25,77 @@


public class FeedableBodyGeneratorTest { public class FeedableBodyGeneratorTest {


private FeedableBodyGenerator feedableBodyGenerator; private FeedableBodyGenerator feedableBodyGenerator;
private TestFeedListener listener; private TestFeedListener listener;


@BeforeMethod @BeforeMethod
public void setUp() throws Exception { public void setUp() throws Exception {
feedableBodyGenerator = new FeedableBodyGenerator(); feedableBodyGenerator = new FeedableBodyGenerator();
listener = new TestFeedListener(); listener = new TestFeedListener();
feedableBodyGenerator.setListener(listener); feedableBodyGenerator.setListener(listener);
} }


@Test(groups = "standalone") @Test(groups = "standalone")
public void feedNotifiesListener() throws Exception { public void feedNotifiesListener() throws Exception {
feedableBodyGenerator.feed(ByteBuffer.allocate(0), false); feedableBodyGenerator.feed(ByteBuffer.allocate(0), false);
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true); feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
assertEquals(listener.getCalls(), 2); assertEquals(listener.getCalls(), 2);
} }


@Test(groups = "standalone") @Test(groups = "standalone")
public void readingBytesReturnsFedContentWithEmptyLastBuffer() throws Exception { public void readingBytesReturnsFedContentWithEmptyLastBufferWhenChunkBoundariesEnabled() throws Exception {
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII); feedableBodyGenerator.writeChunkBoundaries();
feedableBodyGenerator.feed(ByteBuffer.wrap(content), false); byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true); feedableBodyGenerator.feed(ByteBuffer.wrap(content), false);
Body body = feedableBodyGenerator.createBody(); feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII)); Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII)); assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1); assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));

assertEquals(body.read(ByteBuffer.allocate(1)), -1);
}

}
@Test(groups = "standalone")
public void readingBytesReturnsFedContentWithFilledLastBuffer() throws Exception { @Test(groups = "standalone")
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII); public void readingBytesReturnsFedContentWithFilledLastBufferWhenChunkBoundariesEnabled() throws Exception {
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true); feedableBodyGenerator.writeChunkBoundaries();
Body body = feedableBodyGenerator.createBody(); byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII)); feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII)); Body body = feedableBodyGenerator.createBody();
assertEquals(body.read(ByteBuffer.allocate(1)), -1); assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));

assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
} assertEquals(body.read(ByteBuffer.allocate(1)), -1);


@Test(groups = "standalone") }
public void readingBytesReturnsFedContentWithoutChunkBoundariesWhenDisabled() throws Exception {
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII); @Test(groups = "standalone")
feedableBodyGenerator.setWriteChunkBoundaries(false); public void readingBytesReturnsFedContentWithoutChunkBoundariesWhenNotEnabled() throws Exception {
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true); byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
Body body = feedableBodyGenerator.createBody(); feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
assertEquals(readFromBody(body), "Test123".getBytes(StandardCharsets.US_ASCII)); Body body = feedableBodyGenerator.createBody();
assertEquals(body.read(ByteBuffer.allocate(1)), -1); assertEquals(readFromBody(body), "Test123".getBytes(StandardCharsets.US_ASCII));

assertEquals(body.read(ByteBuffer.allocate(1)), -1);
} }


private byte[] readFromBody(Body body) throws IOException { private byte[] readFromBody(Body body) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(512); ByteBuffer byteBuffer = ByteBuffer.allocate(512);
body.read(byteBuffer); body.read(byteBuffer);
byteBuffer.flip(); byteBuffer.flip();
byte[] readBytes = new byte[byteBuffer.remaining()]; byte[] readBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(readBytes); byteBuffer.get(readBytes);
return readBytes; return readBytes;
}

private static class TestFeedListener implements FeedableBodyGenerator.FeedListener {

private int calls;
@Override
public void onContentAdded() {
calls++;
} }


public int getCalls() { private static class TestFeedListener implements FeedableBodyGenerator.FeedListener {
return calls;
private int calls;

@Override
public void onContentAdded() {
calls++;
}

public int getCalls() {
return calls;
}
} }
}
} }
Expand Up @@ -66,7 +66,9 @@ public void write(final Channel channel, NettyResponseFuture<?> future) throws I


BodyGenerator bg = future.getRequest().getBodyGenerator(); BodyGenerator bg = future.getRequest().getBodyGenerator();
if (bg instanceof FeedableBodyGenerator) { if (bg instanceof FeedableBodyGenerator) {
FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() { final FeedableBodyGenerator feedableBodyGenerator = (FeedableBodyGenerator) bg;
feedableBodyGenerator.writeChunkBoundaries();
feedableBodyGenerator.setListener(new FeedListener() {
@Override @Override
public void onContentAdded() { public void onContentAdded() {
channel.getPipeline().get(ChunkedWriteHandler.class).resumeTransfer(); channel.getPipeline().get(ChunkedWriteHandler.class).resumeTransfer();
Expand Down
Expand Up @@ -68,9 +68,7 @@ public void write(final Channel channel, NettyResponseFuture<?> future) throws I


BodyGenerator bg = future.getRequest().getBodyGenerator(); BodyGenerator bg = future.getRequest().getBodyGenerator();
if (bg instanceof FeedableBodyGenerator) { if (bg instanceof FeedableBodyGenerator) {
final FeedableBodyGenerator feedableBodyGenerator = (FeedableBodyGenerator) bg; FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() {
feedableBodyGenerator.setWriteChunkBoundaries(false);
feedableBodyGenerator.setListener(new FeedListener() {
@Override @Override
public void onContentAdded() { public void onContentAdded() {
channel.pipeline().get(ChunkedWriteHandler.class).resumeTransfer(); channel.pipeline().get(ChunkedWriteHandler.class).resumeTransfer();
Expand Down

0 comments on commit f5f9b4b

Please sign in to comment.