Skip to content

Commit

Permalink
Refactor Body API so that read return a State, close #956
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Aug 24, 2015
1 parent af0df14 commit afa7418
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 91 deletions.
20 changes: 19 additions & 1 deletion api/src/main/java/org/asynchttpclient/request/body/Body.java
Expand Up @@ -22,6 +22,24 @@
*/
public interface Body extends Closeable {

enum State {

/**
* There's something to read
*/
Continue,

/**
* There's nothing to read and input has to suspend
*/
Suspend,

/**
* There's nothing to read and input has to stop
*/
Stop;
}

/**
* Gets the length of the body.
*
Expand All @@ -37,5 +55,5 @@ public interface Body extends Closeable {
* @throws IOException If the chunk could not be read.
*/
// FIXME introduce a visitor pattern so that Netty can pass a pooled buffer
long read(ByteBuffer buffer) throws IOException;
State read(ByteBuffer buffer) throws IOException;
}
Expand Up @@ -36,22 +36,21 @@ public long getContentLength() {
return bytes.length;
}

public long read(ByteBuffer byteBuffer) throws IOException {
public State read(ByteBuffer byteBuffer) throws IOException {

if (eof) {
return -1;
return State.Stop;
}

final int remaining = bytes.length - lastPosition;
if (remaining <= byteBuffer.capacity()) {
byteBuffer.put(bytes, lastPosition, remaining);
eof = true;
return remaining;
} else {
byteBuffer.put(bytes, lastPosition, byteBuffer.capacity());
lastPosition = lastPosition + byteBuffer.capacity();
return byteBuffer.capacity();
}
return State.Continue;
}

public void close() throws IOException {
Expand Down
Expand Up @@ -60,47 +60,43 @@ public void writeChunkBoundaries() {
this.writeChunkBoundaries = true;
}

private enum PushBodyState {
ONGOING, FINISHED;
}

public final class PushBody implements Body {

private PushBodyState state = PushBodyState.ONGOING;
private State state = State.Continue;

@Override
public long getContentLength() {
return -1;
}

@Override
public long read(final ByteBuffer buffer) throws IOException {
public State read(final ByteBuffer buffer) throws IOException {
switch (state) {
case ONGOING:
case Continue:
return readNextPart(buffer);
case FINISHED:
return -1;
case Stop:
return State.Stop;
default:
throw new IllegalStateException("Illegal process state.");
}
}

private long readNextPart(ByteBuffer buffer) throws IOException {
int reads = 0;
while (buffer.hasRemaining() && state != PushBodyState.FINISHED) {
private State readNextPart(ByteBuffer buffer) throws IOException {
State res = State.Suspend;
while (buffer.hasRemaining() && state != State.Stop) {
BodyPart nextPart = queue.peek();
if (nextPart == null) {
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
return reads;
return res;
} else if (!nextPart.buffer.hasRemaining() && !nextPart.isLast) {
// skip empty buffers
queue.remove();
} else {
res = State.Continue;
readBodyPart(buffer, nextPart);
reads++;
}
}
return reads;
return res;
}

private void readBodyPart(ByteBuffer buffer, BodyPart part) {
Expand All @@ -111,7 +107,7 @@ private void readBodyPart(ByteBuffer buffer, BodyPart part) {

if (!part.buffer.hasRemaining() && !part.endPadding.hasRemaining()) {
if (part.isLast) {
state = PushBodyState.FINISHED;
state = State.Stop;
}
queue.remove();
}
Expand Down
Expand Up @@ -66,7 +66,7 @@ public long getContentLength() {
return -1L;
}

public long read(ByteBuffer buffer) throws IOException {
public State read(ByteBuffer buffer) throws IOException {

// To be safe.
chunk = new byte[buffer.remaining() - 10];
Expand All @@ -86,35 +86,27 @@ public long read(ByteBuffer buffer) throws IOException {
// - Then a separate packet of "\r\n".getBytes()
if (!eof) {
endDataCount++;
if (endDataCount == 2)
eof = true;

if (endDataCount == 1)
buffer.put(ZERO);
else if (endDataCount == 2)
eof = true;

buffer.put(END_PADDING);

return buffer.position();
} else {
eof = false;
}
return -1;
} else {
// Netty 3.2.3 doesn't support chunking encoding properly, so we chunk encoding ourself.
buffer.put(Integer.toHexString(read).getBytes());
// Chunking is separated by "<bytesreads>\r\n"
buffer.put(END_PADDING);
buffer.put(chunk, 0, read);
// Was missing the final chunk \r\n.
buffer.put(END_PADDING);
}

/**
* Netty 3.2.3 doesn't support chunking encoding properly, so we chunk encoding ourself.
*/

buffer.put(Integer.toHexString(read).getBytes());
// Chunking is separated by "<bytesreads>\r\n"
buffer.put(END_PADDING);
buffer.put(chunk, 0, read);
// Was missing the final chunk \r\n.
buffer.put(END_PADDING);
} else if (read > 0) {
buffer.put(chunk, 0, read);
}
return read;
return read < 0 ? State.Stop : State.Continue;
}

public void close() throws IOException {
Expand Down
Expand Up @@ -93,14 +93,14 @@ public long transferTo(long position, WritableByteChannel target) throws IOExcep
}

// Regular Body API
public long read(ByteBuffer buffer) throws IOException {
public State read(ByteBuffer buffer) throws IOException {
try {
int overallLength = 0;

int maxLength = buffer.remaining();

if (currentPart == parts.size() && transfertDone) {
return -1;
return State.Stop;
}

boolean full = false;
Expand Down Expand Up @@ -173,11 +173,11 @@ public long read(ByteBuffer buffer) throws IOException {
}
}
}
return overallLength;
return transfertDone ? State.Continue : State.Stop;

} catch (Exception e) {
LOGGER.error("Read exception", e);
return 0;
return State.Stop;
}
}

Expand Down
Expand Up @@ -14,6 +14,7 @@
package org.asynchttpclient.request.body.generator;

import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.Body.State;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -51,7 +52,7 @@ public void readingBytesReturnsFedContentWithEmptyLastBufferWhenChunkBoundariesE
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
assertEquals(body.read(ByteBuffer.allocate(1)), State.Stop);
}

@Test(groups = "standalone")
Expand All @@ -62,7 +63,7 @@ public void readingBytesReturnsFedContentWithEmptyLastBufferWhenChunkBoundariesE
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "7\r\nTest123\r\n0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
assertEquals(body.read(ByteBuffer.allocate(1)), State.Stop);
}

@Test(groups = "standalone")
Expand All @@ -72,7 +73,7 @@ public void readingBytesReturnsFedContentWithFilledLastBufferWhenChunkBoundaries
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "7\r\nTest123\r\n0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
assertEquals(body.read(ByteBuffer.allocate(1)), State.Stop);

}

Expand All @@ -82,7 +83,7 @@ public void readingBytesReturnsFedContentWithoutChunkBoundariesWhenNotEnabled()
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "Test123".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
assertEquals(body.read(ByteBuffer.allocate(1)), State.Stop);
}


Expand All @@ -94,7 +95,7 @@ public void returnZeroToSuspendStreamWhenNothingIsInQueue() throws Exception {

Body body = feedableBodyGenerator.createBody();
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
assertEquals(body.read(ByteBuffer.allocate(1)), 0);
assertEquals(body.read(ByteBuffer.allocate(1)), State.Suspend);
}

private byte[] readFromBody(Body body) throws IOException {
Expand Down
Expand Up @@ -16,6 +16,7 @@
import static org.testng.Assert.assertEquals;

import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.Body.State;
import org.asynchttpclient.request.body.generator.ByteArrayBodyGenerator;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -44,11 +45,11 @@ public void testSingleRead() throws IOException {
final ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkSize);

// should take 1 read to get through the srcArray
assertEquals(body.read(chunkBuffer), srcArraySize);
body.read(chunkBuffer);
assertEquals(chunkBuffer.position(), srcArraySize, "bytes read");
chunkBuffer.clear();

assertEquals(body.read(chunkBuffer), -1, "body at EOF");
assertEquals(body.read(chunkBuffer), State.Stop, "body at EOF");
}

@Test(groups = "standalone")
Expand All @@ -65,7 +66,7 @@ public void testMultipleReads() throws IOException {

int reads = 0;
int bytesRead = 0;
while (body.read(chunkBuffer) != -1) {
while (body.read(chunkBuffer) != State.Stop) {
reads += 1;
bytesRead += chunkBuffer.position();
chunkBuffer.clear();
Expand Down
Expand Up @@ -16,6 +16,7 @@

import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.Body.State;
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
import org.asynchttpclient.request.body.multipart.FilePart;
import org.asynchttpclient.request.body.multipart.MultipartUtils;
Expand All @@ -35,7 +36,7 @@
public class MultipartBodyTest {

@Test(groups = "fast")
public void testBasics() {
public void testBasics() throws IOException {
final List<Part> parts = new ArrayList<>();

// add a file
Expand Down Expand Up @@ -64,30 +65,20 @@ private static File getTestfile() {
return file;
}

private static void compareContentLength(final List<Part> parts) {
private static void compareContentLength(final List<Part> parts) throws IOException {
Assert.assertNotNull(parts);
// get expected values
final Body multipartBody = MultipartUtils.newMultipartBody(parts, new FluentCaseInsensitiveStringsMap());
final long expectedContentLength = multipartBody.getContentLength();
try {
final ByteBuffer buffer = ByteBuffer.allocate(8192);
boolean last = false;
long totalBytes = 0;
while (!last) {
long readBytes = 0;
try {
readBytes = multipartBody.read(buffer);
} catch (IOException ie) {
Assert.fail("read failure");
}
if (readBytes > 0) {
totalBytes += readBytes;
} else {
if (multipartBody.read(buffer) == State.Stop) {
last = true;
}
buffer.clear();
}
Assert.assertEquals(totalBytes, expectedContentLength);
Assert.assertEquals(buffer.position(), expectedContentLength);
} finally {
try {
multipartBody.close();
Expand Down
Expand Up @@ -16,7 +16,6 @@
import java.nio.ByteBuffer;

import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.stream.ChunkedInput;

Expand Down Expand Up @@ -54,17 +53,19 @@ public Object nextChunk() throws Exception {
return null;
} else {
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
long r = body.read(buffer);
if (r < 0L) {
endOfInput = true;
return null;
} else if (r == 0 && body instanceof FeedableBodyGenerator.PushBody) {
//this will suspend the stream in ChunkedWriteHandler
return null;
} else {
endOfInput = r == contentLength || r < chunkSize && contentLength > 0;
buffer.flip();
return ChannelBuffers.wrappedBuffer(buffer);
Body.State state = body.read(buffer);
switch (state) {
case Stop:
endOfInput = true;
return null;
case Suspend:
//this will suspend the stream in ChunkedWriteHandler
return null;
case Continue:
buffer.flip();
return ChannelBuffers.wrappedBuffer(buffer);
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
}
Expand Down

0 comments on commit afa7418

Please sign in to comment.