From c2c1342fd3a41061fff71b46bc59fc4f08ddb4ee Mon Sep 17 00:00:00 2001 From: gail Date: Mon, 9 May 2011 10:01:41 -0700 Subject: [PATCH 1/2] Possibly fixed regression test issues. --- .../ning/http/multipart/MultipartBody.java | 331 ++++++++++++++++-- 1 file changed, 305 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/ning/http/multipart/MultipartBody.java b/src/main/java/com/ning/http/multipart/MultipartBody.java index 26e388193a..bedffc346a 100644 --- a/src/main/java/com/ning/http/multipart/MultipartBody.java +++ b/src/main/java/com/ning/http/multipart/MultipartBody.java @@ -16,8 +16,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; @@ -35,6 +37,15 @@ public class MultipartBody implements RandomAccessBody { private List files; private int startPart; private final static Logger logger = LoggerFactory.getLogger(MultipartBody.class); + ByteArrayInputStream currentStream; + int currentStreamPosition; + boolean endWritten; + boolean doneWritingParts; + FileLocation fileLocation; + FilePart currentFilePart; + FileChannel currentFileChannel; + + enum FileLocation{ NONE, START, MIDDLE, END}; public MultipartBody(List parts, String boundary, String contentLength) { this.boundary = MultipartEncodingUtil.getAsciiBytes(boundary.substring("multipart/form-data; boundary=".length())); @@ -44,6 +55,11 @@ public MultipartBody(List parts, String boundary, Str files = new ArrayList(); startPart = 0; + currentStreamPosition = -1; + endWritten = false; + doneWritingParts = false; + fileLocation = FileLocation.NONE; + currentFilePart = null; } public void close() throws IOException { @@ -57,11 +73,242 @@ public long getContentLength() { } public long read(ByteBuffer buffer) throws IOException { - // TODO Not implemented - return 0; + try { + int overallLength = 0; + + int maxLength = buffer.capacity(); + + if (startPart == parts.size()&& endWritten) { + return overallLength; + } + + boolean full = false; + while(!full && !doneWritingParts) { + com.ning.http.client.Part part = null; + if(startPart < parts.size()) { + part = parts.get(startPart); + } + if(currentFileChannel != null) { + overallLength += currentFileChannel.read(buffer); + + if(currentFileChannel.position() == currentFileChannel.size()) { + currentFileChannel.close(); + currentFileChannel = null; + } + + if(overallLength == maxLength) { + full = true; + } + } + else if(currentStreamPosition > -1) { + overallLength += writeToBuffer(buffer, maxLength - overallLength); + + if(overallLength == maxLength) { + full = true; + } + if(startPart == parts.size() && currentStream.available() == 0) { + doneWritingParts = true; + } + } + else if (part instanceof StringPart) { + StringPart currentPart = (StringPart)part; + + initializeStringPart(currentPart); + + startPart++; + }else if(part instanceof com.ning.http.client.StringPart) { + StringPart currentPart = generateClientStringpart(part); + + initializeStringPart(currentPart); + + startPart++; + }else if(part instanceof FilePart) { + if(fileLocation == FileLocation.NONE) { + currentFilePart = (FilePart)part; + initializeFilePart(currentFilePart); + }else if(fileLocation == FileLocation.START) { + initializeFileBody(currentFilePart); + }else if(fileLocation == FileLocation.MIDDLE) { + initializeFileEnd(currentFilePart); + }else if(fileLocation == FileLocation.END) { + startPart++; + if(startPart == parts.size() && currentStream.available() == 0) { + doneWritingParts = true; + } + } + } + else if(part instanceof com.ning.http.client.FilePart) { + if(fileLocation == FileLocation.NONE) { + currentFilePart = generateClientFilePart(part); + initializeFilePart(currentFilePart); + }else if(fileLocation == FileLocation.START) { + initializeFileBody(currentFilePart); + }else if(fileLocation == FileLocation.MIDDLE) { + initializeFileEnd(currentFilePart); + }else if(fileLocation == FileLocation.END) { + startPart++; + if(startPart == parts.size() && currentStream.available() == 0) { + doneWritingParts = true; + } + } + } + else if(part instanceof com.ning.http.client.ByteArrayPart) { + com.ning.http.client.ByteArrayPart bytePart = + (com.ning.http.client.ByteArrayPart)part; + + if(fileLocation == FileLocation.NONE) { + currentFilePart = + generateClientByteArrayPart(bytePart); + + initializeFilePart(currentFilePart); + }else if(fileLocation == FileLocation.START) { + initializeByteArrayBody(currentFilePart); + }else if(fileLocation == FileLocation.MIDDLE) { + initializeFileEnd(currentFilePart); + }else if(fileLocation == FileLocation.END) { + startPart++; + if(startPart == parts.size() && currentStream.available() == 0) { + doneWritingParts = true; + } + } + } + } + + if(doneWritingParts) { + if(currentStreamPosition == -1) { + ByteArrayOutputStream endWriter = new ByteArrayOutputStream(); + + Part.sendMessageEnd(endWriter, boundary); + + initializeBuffer(endWriter); + } + + if(currentStreamPosition > -1) { + overallLength += writeToBuffer(buffer, maxLength - overallLength); + + if(currentStream.available() == 0) { + currentStream.close(); + currentStreamPosition = -1; + endWritten = true; + } + } + } + return overallLength; + + }catch(Exception e) { + e.printStackTrace(); + return 0; + } + } + + private void initializeByteArrayBody(FilePart filePart) + throws IOException { + + ByteArrayOutputStream output = generateByteArrayBody(filePart); + + initializeBuffer(output); + + fileLocation = FileLocation.MIDDLE; + } + + private void initializeFileEnd(FilePart currentPart) + throws IOException { + + ByteArrayOutputStream output = generateFileEnd(currentPart); + + initializeBuffer(output); + + fileLocation = FileLocation.END; + + } + + private void initializeFileBody(FilePart currentPart) + throws IOException { + + if ( FilePartSource.class.isAssignableFrom( currentPart.getSource().getClass())) { + + FilePartSource source = (FilePartSource) currentPart.getSource(); + + File file = source.getFile(); + + RandomAccessFile raf = new RandomAccessFile(file, "r"); + files.add(raf); + + currentFileChannel = raf.getChannel(); + + } else { + PartSource partSource = currentPart.getSource(); + + InputStream stream = partSource.createInputStream(); + + byte[] bytes = new byte[(int)partSource.getLength()]; + + stream.read(bytes); + + currentStream = new ByteArrayInputStream(bytes); + + currentStreamPosition = 0; + } + + fileLocation = FileLocation.MIDDLE; + } + + private void initializeFilePart(FilePart filePart) + throws IOException { + + filePart.setPartBoundary(boundary); + + ByteArrayOutputStream output = generateFileStart(filePart); + + initializeBuffer(output); + + fileLocation = FileLocation.START; } - public long transferTo(long position, long count, WritableByteChannel target) + private void initializeStringPart(StringPart currentPart) + throws IOException { + currentPart.setPartBoundary(boundary); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + Part.sendPart(outputStream, currentPart, boundary); + + initializeBuffer(outputStream); + } + + private int writeToBuffer(ByteBuffer buffer, int length) + throws IOException { + + int available = currentStream.available(); + + int writeLength = Math.min(available, length); + + byte[] bytes = new byte[writeLength]; + + currentStream.read(bytes); + + buffer.put(bytes); + + if(available <= length) { + currentStream.close(); + currentStreamPosition = -1; + }else { + currentStreamPosition += writeLength; + } + + return writeLength; + } + + private void initializeBuffer(ByteArrayOutputStream outputStream) + throws IOException { + + currentStream = new ByteArrayInputStream(outputStream.toByteArray()); + + currentStreamPosition = 0; + + } + + public long transferTo(long position, long count, WritableByteChannel target) throws IOException { long overallLength = 0; @@ -97,23 +344,17 @@ private long handleClientPart( WritableByteChannel target, com.ning.http.client.Part part) throws IOException { if (part.getClass().equals(com.ning.http.client.StringPart.class)) { - com.ning.http.client.StringPart stringPart = (com.ning.http.client.StringPart) part; - - StringPart currentPart = new StringPart(stringPart.getName(), stringPart.getValue()); + StringPart currentPart = generateClientStringpart(part); return handleStringPart(target, currentPart); } else if (part.getClass().equals(com.ning.http.client.FilePart.class)) { - com.ning.http.client.FilePart currentPart = (com.ning.http.client.FilePart) part; - - FilePart filePart = new FilePart(currentPart.getName(), currentPart.getFile()); + FilePart filePart = generateClientFilePart(part); return handleFilePart(target, filePart); } else if (part.getClass().equals(com.ning.http.client.ByteArrayPart.class)) { com.ning.http.client.ByteArrayPart bytePart = (com.ning.http.client.ByteArrayPart) part; - ByteArrayPartSource source = new ByteArrayPartSource(bytePart.getFileName(), bytePart.getData()); - - FilePart filePart = new FilePart(bytePart.getName(), source, bytePart.getMimeType(), bytePart.getCharSet()); + FilePart filePart = generateClientByteArrayPart(bytePart); return handleByteArrayPart(target, filePart, bytePart.getData()); } @@ -121,32 +362,70 @@ private long handleClientPart( return 0; } + private FilePart generateClientByteArrayPart( + com.ning.http.client.ByteArrayPart bytePart) { + ByteArrayPartSource source = new ByteArrayPartSource(bytePart.getFileName(), bytePart.getData()); + + FilePart filePart = new FilePart(bytePart.getName(), source, bytePart.getMimeType(), bytePart.getCharSet()); + return filePart; + } + + private FilePart generateClientFilePart(com.ning.http.client.Part part) + throws FileNotFoundException { + com.ning.http.client.FilePart currentPart = (com.ning.http.client.FilePart) part; + + FilePart filePart = new FilePart(currentPart.getName(), currentPart.getFile()); + return filePart; + } + + private StringPart generateClientStringpart(com.ning.http.client.Part part) { + com.ning.http.client.StringPart stringPart = (com.ning.http.client.StringPart) part; + + StringPart currentPart = new StringPart(stringPart.getName(), stringPart.getValue()); + return currentPart; + } + private long handleByteArrayPart(WritableByteChannel target, FilePart filePart, byte[] data) throws IOException { - int length = 0; + ByteArrayOutputStream output = generateByteArrayBody(filePart); + return writeToTarget(target, output); + } - ByteArrayOutputStream output = new ByteArrayOutputStream(); + private ByteArrayOutputStream generateByteArrayBody(FilePart filePart) + throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); Part.sendPart(output, filePart, boundary); - length += writeToTarget(target, output); - return length; - - } + return output; + } private long handleFileEnd(WritableByteChannel target, FilePart filePart) throws IOException { - ByteArrayOutputStream endOverhead = new ByteArrayOutputStream(); - - filePart.sendEnd(endOverhead); + ByteArrayOutputStream endOverhead = generateFileEnd(filePart); return this.writeToTarget(target, endOverhead); } + private ByteArrayOutputStream generateFileEnd(FilePart filePart) + throws IOException { + ByteArrayOutputStream endOverhead = new ByteArrayOutputStream(); + + filePart.sendEnd(endOverhead); + return endOverhead; + } + private long handleFileHeaders(WritableByteChannel target, FilePart filePart) throws IOException { filePart.setPartBoundary(boundary); - ByteArrayOutputStream overhead = new ByteArrayOutputStream(); + ByteArrayOutputStream overhead = generateFileStart(filePart); + + return writeToTarget(target, overhead); + } + + private ByteArrayOutputStream generateFileStart(FilePart filePart) + throws IOException { + ByteArrayOutputStream overhead = new ByteArrayOutputStream(); filePart.setPartBoundary(boundary); @@ -155,9 +434,8 @@ private long handleFileHeaders(WritableByteChannel target, FilePart filePart) th filePart.sendContentTypeHeader(overhead); filePart.sendTransferEncodingHeader(overhead); filePart.sendEndOfHeader(overhead); - - return writeToTarget(target, overhead); - } + return overhead; + } private long handleFilePart(WritableByteChannel target, FilePart filePart) throws IOException { @@ -246,7 +524,8 @@ private long writeToTarget(WritableByteChannel target, ByteArrayOutputStream byt while ((target.isOpen()) && (written < byteWriter.size())) { ByteBuffer message = ByteBuffer.wrap(byteWriter.toByteArray()); written = target.write(message); - // TODO: This is dangerous to spin + // TODO: This is dangerous to spin, we need to find another way to wait until + //the byte channel is ready to receive the additional bytes or else data is lost. if (written != byteWriter.size()) { logger.info("Waiting for writing..."); try { From 1fe8eab77c164e8a3882924d29c0fe0d9e07bd47 Mon Sep 17 00:00:00 2001 From: gail Date: Thu, 12 May 2011 07:35:12 -0700 Subject: [PATCH 2/2] added spaces to push the version of the async provided i'm using. --- .../http/client/providers/netty/NettyAsyncHttpProvider.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java index 7b93606769..0165a94cc6 100644 --- a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java +++ b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java @@ -14,7 +14,7 @@ * under the License. */ package com.ning.http.client.providers.netty; - + import com.ning.http.client.AsyncHandler; import com.ning.http.client.AsyncHandler.STATE; import com.ning.http.client.AsyncHttpClientConfig; @@ -104,8 +104,10 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; + import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; + import java.util.ArrayList; import java.util.Collection; import java.util.Iterator;