Skip to content

Commit

Permalink
Lazy load multipart ByteBuf, close #1030
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Nov 5, 2015
1 parent f6087a4 commit 1667bd4
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 64 deletions.
Expand Up @@ -23,7 +23,8 @@

public class ByteArrayMultipartPart extends MultipartPart<ByteArrayPart> {

private final ByteBuf contentBuffer;
// lazy
private ByteBuf contentBuffer;

public ByteArrayMultipartPart(ByteArrayPart part, byte[] boundary) {
super(part, boundary);
Expand All @@ -37,14 +38,20 @@ protected long getContentLength() {

@Override
protected long transferContentTo(ByteBuf target) throws IOException {
return transfer(contentBuffer, target, MultipartState.POST_CONTENT);
return transfer(lazyLoadContentBuffer(), target, MultipartState.POST_CONTENT);
}

@Override
protected long transferContentTo(WritableByteChannel target) throws IOException {
return transfer(contentBuffer, target, MultipartState.POST_CONTENT);
return transfer(lazyLoadContentBuffer(), target, MultipartState.POST_CONTENT);
}

private ByteBuf lazyLoadContentBuffer() {
if (contentBuffer == null)
contentBuffer = Unpooled.wrappedBuffer(part.getBytes());
return contentBuffer;
}

@Override
public void close() {
super.close();
Expand Down
Expand Up @@ -25,33 +25,50 @@

public class MessageEndMultipartPart extends MultipartPart<FileLikePart> {

private final ByteBuf buffer;
// lazy
private ByteBuf contentBuffer;

public MessageEndMultipartPart(byte[] boundary) {
super(null, boundary);
buffer = ByteBufAllocator.DEFAULT.buffer((int) length());
buffer.writeBytes(EXTRA_BYTES).writeBytes(boundary).writeBytes(EXTRA_BYTES).writeBytes(CRLF_BYTES);
state = MultipartState.PRE_CONTENT;
}

@Override
public long transferTo(ByteBuf target) throws IOException {
return transfer(buffer, target, MultipartState.DONE);
return transfer(lazyLoadContentBuffer(), target, MultipartState.DONE);
}

@Override
public long transferTo(WritableByteChannel target) throws IOException {
slowTarget = false;
return transfer(buffer, target, MultipartState.DONE);
return transfer(lazyLoadContentBuffer(), target, MultipartState.DONE);
}

private ByteBuf lazyLoadContentBuffer() {
if (contentBuffer == null) {
contentBuffer = ByteBufAllocator.DEFAULT.buffer((int) getContentLength());
contentBuffer.writeBytes(EXTRA_BYTES).writeBytes(boundary).writeBytes(EXTRA_BYTES).writeBytes(CRLF_BYTES);
}
return contentBuffer;
}

@Override
protected int computePreContentLength() {
return 0;
}

@Override
protected ByteBuf computePreContentBytes() {
protected ByteBuf computePreContentBytes(int preContentLength) {
return Unpooled.EMPTY_BUFFER;
}

@Override
protected ByteBuf computePostContentBytes() {
protected int computePostContentLength() {
return 0;
}

@Override
protected ByteBuf computePostContentBytes(int postContentLength) {
return Unpooled.EMPTY_BUFFER;
}

Expand All @@ -72,6 +89,8 @@ protected long transferContentTo(WritableByteChannel target) throws IOException

@Override
public void close() {
buffer.release();
super.close();
if (contentBuffer != null)
contentBuffer.release();
}
}
Expand Up @@ -90,23 +90,25 @@ public abstract class MultipartPart<T extends FileLikePart> implements Closeable
protected final T part;
protected final byte[] boundary;

private final long length;
private ByteBuf preContentBuffer;
private ByteBuf postContentBuffer;
private final int preContentLength;
private final int postContentLength;
protected MultipartState state;
protected boolean slowTarget;

// lazy
private ByteBuf preContentBuffer;
private ByteBuf postContentBuffer;

public MultipartPart(T part, byte[] boundary) {
this.part = part;
this.boundary = boundary;
preContentBuffer = computePreContentBytes();
postContentBuffer = computePostContentBytes();
length = preContentBuffer.readableBytes() + postContentBuffer.readableBytes() + getContentLength();
preContentLength = computePreContentLength();
postContentLength = computePostContentLength();
state = MultipartState.PRE_CONTENT;
}

public long length() {
return length;
return preContentLength + postContentLength + getContentLength();
}

public MultipartState getState() {
Expand All @@ -124,13 +126,13 @@ public long transferTo(ByteBuf target) throws IOException {
return 0L;

case PRE_CONTENT:
return transfer(preContentBuffer, target, MultipartState.CONTENT);
return transfer(lazyLoadPreContentBuffer(), target, MultipartState.CONTENT);

case CONTENT:
return transferContentTo(target);

case POST_CONTENT:
return transfer(postContentBuffer, target, MultipartState.DONE);
return transfer(lazyLoadPostContentBuffer(), target, MultipartState.DONE);

default:
throw new IllegalStateException("Unknown state " + state);
Expand All @@ -145,23 +147,37 @@ public long transferTo(WritableByteChannel target) throws IOException {
return 0L;

case PRE_CONTENT:
return transfer(preContentBuffer, target, MultipartState.CONTENT);
return transfer(lazyLoadPreContentBuffer(), target, MultipartState.CONTENT);

case CONTENT:
return transferContentTo(target);

case POST_CONTENT:
return transfer(postContentBuffer, target, MultipartState.DONE);
return transfer(lazyLoadPostContentBuffer(), target, MultipartState.DONE);

default:
throw new IllegalStateException("Unknown state " + state);
}
}

private ByteBuf lazyLoadPreContentBuffer() {
if (preContentBuffer == null)
preContentBuffer = computePreContentBytes(preContentLength);
return preContentBuffer;
}

private ByteBuf lazyLoadPostContentBuffer() {
if (postContentBuffer == null)
postContentBuffer = computePostContentBytes(postContentLength);
return postContentBuffer;
}

@Override
public void close() {
preContentBuffer.release();
postContentBuffer.release();
if (preContentBuffer != null)
preContentBuffer.release();
if (postContentBuffer != null)
postContentBuffer.release();
}

protected abstract long getContentLength();
Expand Down Expand Up @@ -212,29 +228,27 @@ protected long transfer(ByteBuf source, WritableByteChannel target, MultipartSta
return transferred;
}

protected ByteBuf computePreContentBytes() {

// compute length
protected int computePreContentLength() {
CounterPartVisitor counterVisitor = new CounterPartVisitor();
visitPreContent(counterVisitor);
long length = counterVisitor.getCount();
return counterVisitor.getCount();
}

// compute bytes
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer((int) length);
protected ByteBuf computePreContentBytes(int preContentLength) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(preContentLength);
ByteBufVisitor bytesVisitor = new ByteBufVisitor(buffer);
visitPreContent(bytesVisitor);
return buffer;
}

protected ByteBuf computePostContentBytes() {

// compute length
protected int computePostContentLength() {
CounterPartVisitor counterVisitor = new CounterPartVisitor();
visitPostContent(counterVisitor);
long length = counterVisitor.getCount();
return counterVisitor.getCount();
}

// compute bytes
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer((int) length);
protected ByteBuf computePostContentBytes(int postContentLength) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(postContentLength);
ByteBufVisitor bytesVisitor = new ByteBufVisitor(buffer);
visitPostContent(bytesVisitor);
return buffer;
Expand Down
Expand Up @@ -24,7 +24,7 @@ public interface PartVisitor {

class CounterPartVisitor implements PartVisitor {

private long count = 0L;
private int count = 0;

@Override
public void withBytes(byte[] bytes) {
Expand All @@ -36,7 +36,7 @@ public void withByte(byte b) {
count++;
}

public long getCount() {
public int getCount() {
return count;
}
}
Expand Down
Expand Up @@ -13,6 +13,7 @@
package org.asynchttpclient.request.body.multipart;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaders;
Expand All @@ -24,19 +25,19 @@
import java.util.ArrayList;
import java.util.List;

import org.asynchttpclient.request.body.Body;
import org.apache.commons.io.IOUtils;
import org.asynchttpclient.request.body.Body.BodyState;
import org.testng.Assert;
import org.testng.annotations.Test;

public class MultipartBodyTest {

@Test(groups = "standalone")
public void testBasics() throws IOException {
public void testBasics() throws Exception {
final List<Part> parts = new ArrayList<>();

// add a file
final File testFile = getTestfile();
System.err.println(testFile.length());
parts.add(new FilePart("filePart", testFile));

// add a byte array
Expand All @@ -48,38 +49,25 @@ public void testBasics() throws IOException {
compareContentLength(parts);
}

private static File getTestfile() {
private static File getTestfile() throws URISyntaxException {
final ClassLoader cl = MultipartBodyTest.class.getClassLoader();
final URL url = cl.getResource("textfile.txt");
Assert.assertNotNull(url);
File file = null;
try {
file = new File(url.toURI());
} catch (URISyntaxException use) {
Assert.fail("uri syntax error");
}
return file;
assertNotNull(url);
return new File(url.toURI());
}

private static void compareContentLength(final List<Part> parts) throws IOException {
Assert.assertNotNull(parts);
assertNotNull(parts);
// get expected values
final Body multipartBody = MultipartUtils.newMultipartBody(parts, HttpHeaders.EMPTY_HEADERS);
final MultipartBody multipartBody = MultipartUtils.newMultipartBody(parts, HttpHeaders.EMPTY_HEADERS);
final long expectedContentLength = multipartBody.getContentLength();
try {
final ByteBuf buffer = Unpooled.buffer(8192);
boolean last = false;
while (!last) {
if (multipartBody.transferTo(buffer) == BodyState.STOP) {
last = true;
}
while (multipartBody.transferTo(buffer) != BodyState.STOP) {
}
Assert.assertEquals(buffer.readableBytes(), expectedContentLength);
assertEquals(buffer.readableBytes(), expectedContentLength);
} finally {
try {
multipartBody.close();
} catch (IOException ignore) {
}
IOUtils.closeQuietly(multipartBody);
}
}
}

0 comments on commit 1667bd4

Please sign in to comment.