Skip to content

Commit

Permalink
Rework file upload so that NettyFileUpload is a ReadStream<Buffer>
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 2, 2019
1 parent 7f35f60 commit 3598209
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 91 deletions.
138 changes: 60 additions & 78 deletions src/main/java/io/vertx/core/http/impl/HttpServerFileUploadImpl.java
Expand Up @@ -17,9 +17,8 @@
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.core.streams.ReadStream;

import java.nio.charset.Charset;

Expand All @@ -34,41 +33,86 @@
*/
class HttpServerFileUploadImpl implements HttpServerFileUpload {

private final HttpServerRequest req;
private final ReadStream<Buffer> stream;
private final Context context;
private final String name;
private final String filename;
private final String contentType;
private final String contentTransferEncoding;
private final Charset charset;

private Handler<Buffer> dataHandler;
private Handler<Void> endHandler;
private AsyncFile file;
private Handler<Throwable> exceptionHandler;

private long size;
private InboundBuffer<Buffer> pending;
private boolean ended;
private boolean completed;
private boolean lazyCalculateSize;

HttpServerFileUploadImpl(Context context, HttpServerRequest req, String name, String filename, String contentType,
HttpServerFileUploadImpl(Context context, ReadStream<Buffer> stream, String name, String filename, String contentType,
String contentTransferEncoding,
Charset charset, long size) {
this.context = context;
this.req = req;
this.stream = stream;
this.name = name;
this.filename = filename;
this.contentType = contentType;
this.contentTransferEncoding = contentTransferEncoding;
this.charset = charset;
this.size = size;
this.pending = new InboundBuffer<Buffer>(context)
.drainHandler(v -> req.resume())
.emptyHandler(v -> checkComplete());
if (size == 0) {
lazyCalculateSize = true;
}

stream.handler(this::handleData);
stream.endHandler(this::handleEnd);
}

private void handleData(Buffer data) {
Handler<Buffer> h;
synchronized (HttpServerFileUploadImpl.this) {
h = dataHandler;
if (lazyCalculateSize) {
this.size += data.length();
}
}
if (h != null) {
h.handle(data);
}
}

private void handleEnd(Void v) {
AsyncFile toClose;
synchronized (this) {
lazyCalculateSize = false;
toClose = file;
}
if (toClose != null) {
toClose.close(ar -> {
if (ar.failed()) {
notifyExceptionHandler(ar.cause());
}
notifyEndHandler();
});
} else {
notifyEndHandler();
}
}

private void notifyEndHandler() {
Handler<Void> handler;
synchronized (this) {
handler = endHandler;
}
if (handler != null) {
handler.handle(null);
}
}

private void notifyExceptionHandler(Throwable cause) {
if (exceptionHandler != null) {
exceptionHandler.handle(cause);
}
}

@Override
Expand Down Expand Up @@ -102,27 +146,26 @@ public synchronized long size() {
}

@Override
public HttpServerFileUpload handler(Handler<Buffer> handler) {
pending.handler(handler);
public synchronized HttpServerFileUpload handler(Handler<Buffer> handler) {
dataHandler = handler;
return this;
}

@Override
public HttpServerFileUpload pause() {
pending.pause();
stream.pause();
return this;
}

@Override
public HttpServerFileUpload fetch(long amount) {
pending.resume();
stream.fetch(amount);
return this;
}

@Override
public HttpServerFileUpload resume() {
pending.resume();
checkComplete();
stream.resume();
return this;
}

Expand Down Expand Up @@ -159,67 +202,6 @@ public synchronized boolean isSizeAvailable() {
return !lazyCalculateSize;
}

synchronized void receiveData(Buffer data) {
if (data.length() != 0) {
// Can sometimes receive zero length packets from Netty!
if (lazyCalculateSize) {
size += data.length();
}
doReceiveData(data);
}
}

private synchronized void doReceiveData(Buffer data) {
if (!pending.write(data)) {
req.pause();
}
}

void end() {
synchronized (this) {
ended = true;
}
checkComplete();
}

private void checkComplete() {
AsyncFile toClose;
synchronized (this) {
if (!pending.isEmpty() || pending.isPaused() || !ended || completed) {
return;
}
completed = true;
lazyCalculateSize = false;
toClose = file;
}
if (toClose != null) {
toClose.close(ar -> {
if (ar.failed()) {
notifyExceptionHandler(ar.cause());
}
notifyEndHandler();
});
} else {
notifyEndHandler();
}
}

private void notifyEndHandler() {
Handler<Void> handler;
synchronized (this) {
handler = endHandler;
}
if (handler != null) {
handler.handle(null);
}
}

private void notifyExceptionHandler(Throwable cause) {
if (exceptionHandler != null) {
exceptionHandler.handle(cause);
}
}

@Override
public synchronized AsyncFile file() {
return file;
Expand Down
100 changes: 92 additions & 8 deletions src/main/java/io/vertx/core/http/impl/NettyFileUpload.java
Expand Up @@ -14,7 +14,12 @@
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;

import java.io.File;
import java.io.IOException;
Expand All @@ -24,9 +29,8 @@
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
final class NettyFileUpload implements FileUpload {
final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {

private final HttpServerFileUploadImpl upload;
private final String name;
private String contentType;
private String filename;
Expand All @@ -35,28 +39,108 @@ final class NettyFileUpload implements FileUpload {
private boolean completed;
private long maxSize = -1;

NettyFileUpload(HttpServerFileUploadImpl upload, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) {
this.upload = upload;
private final HttpServerRequest request;
private final InboundBuffer<Buffer> pending;
private boolean ended;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;

NettyFileUpload(Context context, HttpServerRequest request, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) {
this.name = name;
this.filename = filename;
this.contentType = contentType;
this.contentTransferEncoding = contentTransferEncoding;
this.charset = charset;

this.request = request;
this.pending = new InboundBuffer<Buffer>(context)
.drainHandler(v -> request.resume())
.emptyHandler(v -> checkComplete());
}

@Override
public synchronized NettyFileUpload exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}

@Override
public NettyFileUpload handler(Handler<Buffer> handler) {
pending.handler(handler);
return this;
}

@Override
public NettyFileUpload pause() {
pending.pause();
return this;
}

@Override
public NettyFileUpload resume() {
return fetch(Long.MAX_VALUE);
}

@Override
public NettyFileUpload fetch(long amount) {
pending.fetch(amount);
checkComplete();
return this;
}

@Override
public synchronized NettyFileUpload endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}

private void receiveData(Buffer data) {
if (data.length() != 0) {
if (!pending.write(data)) {
request.pause();
}
}
}

private void end() {
synchronized (this) {
ended = true;
}
checkComplete();
}

private void checkComplete() {
synchronized (this) {
if (!pending.isEmpty() || pending.isPaused() || !ended) {
return;
}
}
notifyEndHandler();
}

private void notifyEndHandler() {
Handler<Void> handler;
synchronized (this) {
handler = endHandler;
}
if (handler != null) {
handler.handle(null);
}
}

@Override
public void setContent(ByteBuf channelBuffer) throws IOException {
completed = true;
upload.receiveData(Buffer.buffer(channelBuffer));
upload.end();
receiveData(Buffer.buffer(channelBuffer));
end();
}

@Override
public void addContent(ByteBuf channelBuffer, boolean last) throws IOException {
upload.receiveData(Buffer.buffer(channelBuffer));
receiveData(Buffer.buffer(channelBuffer));
if (last) {
completed = true;
upload.end();
end();
}
}

Expand Down
Expand Up @@ -41,10 +41,10 @@ class NettyFileUploadDataFactory extends DefaultHttpDataFactory {

@Override
public FileUpload createFileUpload(HttpRequest httpRequest, String name, String filename, String contentType, String contentTransferEncoding, Charset charset, long size) {
HttpServerFileUploadImpl upload = new HttpServerFileUploadImpl(context, request, name, filename, contentType, contentTransferEncoding, charset,
size);
NettyFileUpload nettyUpload = new NettyFileUpload(upload, name, filename, contentType,
NettyFileUpload nettyUpload = new NettyFileUpload(context, request, name, filename, contentType,
contentTransferEncoding, charset);
HttpServerFileUploadImpl upload = new HttpServerFileUploadImpl(context, nettyUpload, name, filename, contentType, contentTransferEncoding, charset,
size);
Handler<HttpServerFileUpload> uploadHandler = lazyUploadHandler.get();
if (uploadHandler != null) {
uploadHandler.handle(upload);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/vertx/core/http/HttpTest.java
Expand Up @@ -2769,7 +2769,7 @@ public void testHttpClientResponsePausedDuringLastChunk() throws Exception {

@Test
public void testFormUploadEmptyFile() throws Exception {
testFormUploadFile("", false);
testFormUploadFile("", false, false);
}

@Test
Expand All @@ -2789,7 +2789,7 @@ public void testFormUploadLargeFile() throws Exception {

@Test
public void testFormUploadEmptyFileStreamToDisk() throws Exception {
testFormUploadFile("", true);
testFormUploadFile("", true, false);
}

@Test
Expand Down

0 comments on commit 3598209

Please sign in to comment.