Skip to content

Commit

Permalink
Fix HTTP/2 server response send file to use the channel registration …
Browse files Browse the repository at this point in the history
…that is asynchronous
  • Loading branch information
vietj committed Sep 20, 2017
1 parent 0d3aec1 commit 77bf582
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 15 deletions.
16 changes: 9 additions & 7 deletions src/main/java/io/vertx/core/http/impl/FileStreamChannel.java
Expand Up @@ -61,7 +61,7 @@ class FileStreamChannel extends AbstractChannel {
private final VertxHttp2Stream stream;

FileStreamChannel(
Handler<AsyncResult<Long>> resultHandler,
Future<Long> result,
VertxHttp2Stream stream,
long offset,
long length) {
Expand All @@ -78,17 +78,19 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (evt instanceof RandomAccessFile) {
ChannelFuture fut = ctx.writeAndFlush(new ChunkedFile((RandomAccessFile) evt, offset, length, 8192 /* default chunk size */ ));
fut.addListener(f -> {
if (resultHandler != null) {
if (f.isSuccess()) {
resultHandler.handle(Future.succeededFuture(bytesWritten));
} else {
resultHandler.handle(Future.failedFuture(f.cause()));
}
if (f.isSuccess()) {
result.tryComplete(bytesWritten);
} else {
result.tryFail(f.cause());
}
fut.addListener(ChannelFutureListener.CLOSE);
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
result.tryFail(cause);
}
});
}
});
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java
Expand Up @@ -18,6 +18,8 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -550,7 +552,8 @@ public HttpServerResponse sendFile(String filename, long offset, long length, Ha
}
checkSendHeaders(false);

FileStreamChannel fileChannel = new FileStreamChannel(ar -> {
Future<Long> result = Future.future();
result.setHandler(ar -> {
if (ar.succeeded()) {
bytesWritten += ar.result();
end();
Expand All @@ -560,10 +563,20 @@ public HttpServerResponse sendFile(String filename, long offset, long length, Ha
resultHandler.handle(Future.succeededFuture());
});
}
}, stream, offset, contentLength);
});

FileStreamChannel fileChannel = new FileStreamChannel(result, stream, offset, contentLength);
drainHandler(fileChannel.drainHandler);
ctx.channel().eventLoop().register(fileChannel);
fileChannel.pipeline().fireUserEventTriggered(raf);
ctx.channel()
.eventLoop()
.register(fileChannel)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
fileChannel.pipeline().fireUserEventTriggered(raf);
} else {
result.tryFail(future.cause());
}
});
}
return this;
}
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java
Expand Up @@ -17,6 +17,7 @@
package io.vertx.core.http.impl;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.CharsetUtil;
import io.vertx.codegen.annotations.Nullable;
Expand Down Expand Up @@ -218,16 +219,27 @@ public NetSocket sendFile(String filename, long offset, long length, Handler<Asy

long contentLength = Math.min(length, file.length() - offset);

FileStreamChannel fileChannel = new FileStreamChannel(ar -> {
Future<Long> result = Future.future();
result.setHandler(ar -> {
if (resultHandler != null) {
resultCtx.runOnContext(v -> {
resultHandler.handle(Future.succeededFuture());
});
}
}, this, offset, contentLength);
});

FileStreamChannel fileChannel = new FileStreamChannel(result, this, offset, contentLength);
drainHandler(fileChannel.drainHandler);
handlerContext.channel().eventLoop().register(fileChannel);
fileChannel.pipeline().fireUserEventTriggered(raf);
handlerContext.channel()
.eventLoop()
.register(fileChannel)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
fileChannel.pipeline().fireUserEventTriggered(raf);
} else {
result.tryFail(future.cause());
}
});
}
return this;
}
Expand Down

0 comments on commit 77bf582

Please sign in to comment.