Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/range send file #1115

Merged
merged 5 commits into from Aug 19, 2015
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 64 additions & 2 deletions src/main/java/io/vertx/core/http/HttpServerResponse.java
Expand Up @@ -262,7 +262,40 @@ public interface HttpServerResponse extends WriteStream<Buffer> {
* @return a reference to this, so the API can be used fluently
*/
@Fluent
HttpServerResponse sendFile(String filename);
default HttpServerResponse sendFile(String filename) {
return sendFile(filename, 0);
}

/**
* Ask the OS to stream a file as specified by {@code filename} directly
* from disk to the outgoing connection, bypassing userspace altogether
* (where supported by the underlying operating system.
* This is a very efficient way to serve files.<p>
* The actual serve is asynchronous and may not complete until some time after this method has returned.
*
* @param filename path to the file to serve
* @param offset offset to start serving from
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpServerResponse sendFile(String filename, long offset) {
return sendFile(filename, offset, Long.MAX_VALUE);
}

/**
* Ask the OS to stream a file as specified by {@code filename} directly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When creating an overloaded method, instead of duplicating the javadoc, it's preferable to use {@link} to refer to the version with the full javadoc and just explain the differences.

* from disk to the outgoing connection, bypassing userspace altogether
* (where supported by the underlying operating system.
* This is a very efficient way to serve files.<p>
* The actual serve is asynchronous and may not complete until some time after this method has returned.
*
* @param filename path to the file to serve
* @param offset offset to start serving from
* @param length length to serve to
* @return a reference to this, so the API can be used fluently
*/
@Fluent
HttpServerResponse sendFile(String filename, long offset, long length);

/**
* Like {@link #sendFile(String)} but providing a handler which will be notified once the file has been completely
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. here's an example of using {@link}

Expand All @@ -273,7 +306,36 @@ public interface HttpServerResponse extends WriteStream<Buffer> {
* @return a reference to this, so the API can be used fluently
*/
@Fluent
HttpServerResponse sendFile(String filename, Handler<AsyncResult<Void>> resultHandler);
default HttpServerResponse sendFile(String filename, Handler<AsyncResult<Void>> resultHandler) {
return sendFile(filename, 0, resultHandler);
}

/**
* Like {@link #sendFile(String, long)} but providing a handler which will be notified once the file has been completely
* written to the wire.
*
* @param filename path to the file to serve
* @param offset the offset to serve from
* @param resultHandler handler that will be called on completion
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpServerResponse sendFile(String filename, long offset, Handler<AsyncResult<Void>> resultHandler) {
return sendFile(filename, offset, Long.MAX_VALUE, resultHandler);
}

/**
* Like {@link #sendFile(String, long, long)} but providing a handler which will be notified once the file has been
* completely written to the wire.
*
* @param filename path to the file to serve
* @param offset the offset to serve from
* @param length the length to serve to
* @param resultHandler handler that will be called on completion
* @return a reference to this, so the API can be used fluently
*/
@Fluent
HttpServerResponse sendFile(String filename, long offset, long length, Handler<AsyncResult<Void>> resultHandler);

/**
* Close the underlying TCP connection corresponding to the request.
Expand Down
Expand Up @@ -328,14 +328,14 @@ public void end() {
}

@Override
public HttpServerResponseImpl sendFile(String filename) {
doSendFile(filename, null);
public HttpServerResponseImpl sendFile(String filename, long offset, long length) {
doSendFile(filename, offset, length, null);
return this;
}

@Override
public HttpServerResponse sendFile(String filename, Handler<AsyncResult<Void>> resultHandler) {
doSendFile(filename, resultHandler);
public HttpServerResponse sendFile(String filename, long start, long end, Handler<AsyncResult<Void>> resultHandler) {
doSendFile(filename, start, end, resultHandler);
return this;
}

Expand Down Expand Up @@ -420,16 +420,16 @@ private void end0(ByteBuf data) {
}
}

private void doSendFile(String filename, Handler<AsyncResult<Void>> resultHandler) {
private void doSendFile(String filename, long offset, long length, Handler<AsyncResult<Void>> resultHandler) {
synchronized (conn) {
if (headWritten) {
throw new IllegalStateException("Head already written");
}
checkWritten();
File file = vertx.resolveFile(filename);
long fileLength = file.length();
long contentLength = Math.min(length, file.length() - offset);
if (!contentLengthSet()) {
putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(fileLength));
putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(contentLength));
}
if (!contentTypeSet()) {
int li = filename.lastIndexOf('.');
Expand All @@ -447,7 +447,7 @@ private void doSendFile(String filename, Handler<AsyncResult<Void>> resultHandle
try {
raf = new RandomAccessFile(file, "r");
conn.queueForWrite(response);
conn.sendFile(raf, fileLength);
conn.sendFile(raf, Math.min(offset, file.length()), contentLength);
} catch (IOException e) {
try {
if (raf != null) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ServerConnection.java
Expand Up @@ -369,8 +369,8 @@ protected boolean supportsFileRegion() {
return super.supportsFileRegion() && channel.pipeline().get(HttpChunkContentCompressor.class) == null;
}

protected ChannelFuture sendFile(RandomAccessFile file, long fileLength) throws IOException {
return super.sendFile(file, fileLength);
protected ChannelFuture sendFile(RandomAccessFile file, long offset, long length) throws IOException {
return super.sendFile(file, offset, length);
}

private void processMessage(Object msg) {
Expand Down
60 changes: 58 additions & 2 deletions src/main/java/io/vertx/core/net/NetSocket.java
Expand Up @@ -108,7 +108,34 @@ public interface NetSocket extends ReadStream<Buffer>, WriteStream<Buffer> {
* @return a reference to this, so the API can be used fluently
*/
@Fluent
NetSocket sendFile(String filename);
default NetSocket sendFile(String filename) {
return sendFile(filename, 0, Long.MAX_VALUE);
}

/**
* Tell the operating system to stream a file as specified by {@code filename} directly from disk to the outgoing connection,
* bypassing userspace altogether (where supported by the underlying operating system. This is a very efficient way to stream files.
*
* @param filename file name of the file to send
* @param offset offset
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default NetSocket sendFile(String filename, long offset) {
return sendFile(filename, offset, Long.MAX_VALUE);
}

/**
* Tell the operating system to stream a file as specified by {@code filename} directly from disk to the outgoing connection,
* bypassing userspace altogether (where supported by the underlying operating system. This is a very efficient way to stream files.
*
* @param filename file name of the file to send
* @param offset offset
* @param length length
* @return a reference to this, so the API can be used fluently
*/
@Fluent
NetSocket sendFile(String filename, long offset, long length);

/**
* Same as {@link #sendFile(String)} but also takes a handler that will be called when the send has completed or
Expand All @@ -119,7 +146,36 @@ public interface NetSocket extends ReadStream<Buffer>, WriteStream<Buffer> {
* @return a reference to this, so the API can be used fluently
*/
@Fluent
NetSocket sendFile(String filename, Handler<AsyncResult<Void>> resultHandler);
default NetSocket sendFile(String filename, Handler<AsyncResult<Void>> resultHandler) {
return sendFile(filename, 0, Long.MAX_VALUE, resultHandler);
}

/**
* Same as {@link #sendFile(String, long)} but also takes a handler that will be called when the send has completed or
* a failure has occurred
*
* @param filename file name of the file to send
* @param offset offset
* @param resultHandler handler
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default NetSocket sendFile(String filename, long offset, Handler<AsyncResult<Void>> resultHandler) {
return sendFile(filename, offset, Long.MAX_VALUE, resultHandler);
}

/**
* Same as {@link #sendFile(String, long, long)} but also takes a handler that will be called when the send has completed or
* a failure has occurred
*
* @param filename file name of the file to send
* @param offset offset
* @param length length
* @param resultHandler handler
* @return a reference to this, so the API can be used fluently
*/
@Fluent
NetSocket sendFile(String filename, long offset, long length, Handler<AsyncResult<Void>> resultHandler);

/**
* @return the remote address for this socket
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/net/impl/ConnectionBase.java
Expand Up @@ -197,15 +197,15 @@ private boolean isSSL() {
return channel.pipeline().get(SslHandler.class) != null;
}

protected ChannelFuture sendFile(RandomAccessFile raf, long fileLength) throws IOException {
protected ChannelFuture sendFile(RandomAccessFile raf, long offset, long length) throws IOException {
// Write the content.
ChannelFuture writeFuture;
if (!supportsFileRegion()) {
// Cannot use zero-copy
writeFuture = writeToChannel(new ChunkedFile(raf, 0, fileLength, 8192));
writeFuture = writeToChannel(new ChunkedFile(raf, offset, length, 8192));
} else {
// No encryption - use zero-copy.
FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
FileRegion region = new DefaultFileRegion(raf.getChannel(), offset, length);
writeFuture = writeToChannel(region);
}
if (writeFuture != null) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/vertx/core/net/impl/NetSocketImpl.java
Expand Up @@ -171,20 +171,20 @@ public synchronized NetSocket drainHandler(Handler<Void> drainHandler) {
}

@Override
public NetSocket sendFile(String filename) {
return sendFile(filename, null);
public NetSocket sendFile(String filename, long offset, long length) {
return sendFile(filename, offset, length, null);
}

@Override
public NetSocket sendFile(String filename, final Handler<AsyncResult<Void>> resultHandler) {
public NetSocket sendFile(String filename, long offset, long length, final Handler<AsyncResult<Void>> resultHandler) {
File f = vertx.resolveFile(filename);
if (f.isDirectory()) {
throw new IllegalArgumentException("filename must point to a file and not to a directory");
}
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(f, "r");
ChannelFuture future = super.sendFile(raf, f.length());
ChannelFuture future = super.sendFile(raf, Math.min(offset, f.length()), Math.min(length, f.length() - offset));
if (resultHandler != null) {
future.addListener(fut -> {
final AsyncResult<Void> res;
Expand Down
53 changes: 45 additions & 8 deletions src/test/java/io/vertx/test/core/HttpTest.java
Expand Up @@ -867,9 +867,12 @@ public void testRequestNPE() {
TestUtils.assertNullPointerException(() -> client.requestAbs((HttpMethod) null, "http://someuri", resp -> {
}));
TestUtils.assertNullPointerException(() -> client.request(HttpMethod.GET, 8080, "localhost", "/somepath", null));
TestUtils.assertNullPointerException(() -> client.request((HttpMethod)null, 8080, "localhost", "/somepath", resp -> {}));
TestUtils.assertNullPointerException(() -> client.request(HttpMethod.GET, 8080, null, "/somepath", resp -> {}));
TestUtils.assertNullPointerException(() -> client.request(HttpMethod.GET, 8080, "localhost", null, resp -> {}));
TestUtils.assertNullPointerException(() -> client.request((HttpMethod) null, 8080, "localhost", "/somepath", resp -> {
}));
TestUtils.assertNullPointerException(() -> client.request(HttpMethod.GET, 8080, null, "/somepath", resp -> {
}));
TestUtils.assertNullPointerException(() -> client.request(HttpMethod.GET, 8080, "localhost", null, resp -> {
}));
}

@Test
Expand Down Expand Up @@ -1682,7 +1685,7 @@ public void testClientExceptionHandlerCalledWhenServerTerminatesConnectionAfterP
}).listen(DEFAULT_HTTP_PORT, onSuccess(s -> {
// Exception handler should be called for any requests in the pipeline if connection is closed
client.request(HttpMethod.GET, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp ->
resp.exceptionHandler(t -> testComplete())).exceptionHandler(error -> fail()).end();
resp.exceptionHandler(t -> testComplete())).exceptionHandler(error -> fail()).end();
}));
await();
}
Expand Down Expand Up @@ -3137,7 +3140,8 @@ public void testSetHandlersAfterListening2() throws Exception {

@Test
public void testListenNoHandlers() throws Exception {
assertIllegalStateException(() -> server.listen(ar -> {}));
assertIllegalStateException(() -> server.listen(ar -> {
}));
}

@Test
Expand Down Expand Up @@ -4117,10 +4121,12 @@ public void testSetHandlersOnEnd() {
server.listen(ar -> {
assertTrue(ar.succeeded());
HttpClientRequest req = client.request(HttpMethod.GET, HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, path);
req.handler(resp -> {});
req.handler(resp -> {
});
req.endHandler(done -> {
try {
req.handler(arg -> {});
req.handler(arg -> {
});
fail();
} catch (Exception ignore) {
}
Expand Down Expand Up @@ -4246,7 +4252,7 @@ public void start() throws Exception {
assertTrue(Vertx.currentContext().isWorkerContext());
assertTrue(Context.isOnWorkerThread());
HttpServer server1 = vertx.createHttpServer(new HttpServerOptions()
.setHost(HttpTestBase.DEFAULT_HTTP_HOST).setPort(HttpTestBase.DEFAULT_HTTP_PORT));
.setHost(HttpTestBase.DEFAULT_HTTP_HOST).setPort(HttpTestBase.DEFAULT_HTTP_PORT));
server1.requestHandler(req -> {
assertTrue(Vertx.currentContext().isWorkerContext());
assertTrue(Context.isOnWorkerThread());
Expand Down Expand Up @@ -4372,6 +4378,37 @@ public void testPauseResumeClientResponse() {
await();
}

@Test
public void testSendOpenRangeFileFromClasspath() {
vertx.createHttpServer(new HttpServerOptions().setPort(8080)).requestHandler(res -> {
res.response().sendFile("webroot/somefile.html", 6);
}).listen(onSuccess(res -> {
vertx.createHttpClient(new HttpClientOptions()).request(HttpMethod.GET, 8080, "localhost", "/", resp -> {
resp.bodyHandler(buff -> {
assertTrue(buff.toString().startsWith("<body>blah</body></html>"));
testComplete();
});
}).end();
}));
await();
}

@Test
public void testSendRangeFileFromClasspath() {
vertx.createHttpServer(new HttpServerOptions().setPort(8080)).requestHandler(res -> {
res.response().sendFile("webroot/somefile.html", 6, 6);
}).listen(onSuccess(res -> {
vertx.createHttpClient(new HttpClientOptions()).request(HttpMethod.GET, 8080, "localhost", "/", resp -> {
resp.bodyHandler(buff -> {
assertEquals("<body>", buff.toString());
testComplete();
});
}).end();
}));
await();
}


private void pausingServer(Consumer<HttpServer> consumer) {
server.requestHandler(req -> {
req.response().setChunked(true);
Expand Down