Skip to content

Commit

Permalink
Add support for HTTP compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Norman Maurer committed Oct 10, 2013
1 parent ffa2e50 commit 633ccaa
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 21 deletions.
10 changes: 10 additions & 0 deletions vertx-core/src/main/java/org/vertx/java/core/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,14 @@ public interface HttpClient extends ClientSSLSupport<HttpClient>, TCPSupport<Htt
*/
int getConnectTimeout();

/**
* Set if the {@link HttpClient} should try to use compression.
*/
HttpClient setTryUseCompression(boolean tryUseCompression);

/**
* Returns {@code true} if the {@link HttpClient} should try to use compression.
*/
boolean getTryUseCompression();

}
10 changes: 10 additions & 0 deletions vertx-core/src/main/java/org/vertx/java/core/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,14 @@ public interface HttpServer extends ServerSSLSupport<HttpServer>, ServerTCPSuppo
* is complete.
*/
void close(Handler<AsyncResult<Void>> doneHandler);

/**
* Set if the {@link HttpServer} should compress the http response if the connected client supports it.
*/
HttpServer setCompressionSupported(boolean compressionSupported);

/**
* Returns {@code true} if the {@link HttpServer} should compress the http response if the connected client supports it.
*/
boolean isCompressionSupported();
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ public void run() {
private void handshakeComplete(ChannelHandlerContext ctx, FullHttpResponse response) {
handshaking = true;
try {
//ctx.pipeline().addAfter(ctx.name(), "websocketConverter", WebSocketConvertHandler.INSTANCE);
ChannelHandler handler = ctx.pipeline().get(HttpContentDecompressor.class);
if (handler != null) {
// remove decompressor as its not needed anymore once connection was upgraded to websockets
ctx.pipeline().remove(handler);
}
ws = new DefaultWebSocket(vertx, ClientConnection.this);
handshaker.finishHandshake(channel, response);
log.debug("WebSocket handshake complete");
Expand Down Expand Up @@ -354,9 +358,15 @@ NetSocket createNetSocket() {
// Flush out all pending data
endReadAndFlush();


// remove old http handlers and replace the old handler with one that handle plain sockets
channel.pipeline().remove("codec");
channel.pipeline().replace("handler", "handler", new VertxNetHandler(client.vertx, connectionMap) {
ChannelPipeline pipeline = channel.pipeline();
ChannelHandler inflater = pipeline.get(HttpContentDecompressor.class);
if (inflater != null) {
pipeline.remove(inflater);
}
pipeline.remove("codec");
pipeline.replace("handler", "handler", new VertxNetHandler(client.vertx, connectionMap) {
@Override
public void exceptionCaught(ChannelHandlerContext chctx, Throwable t) throws Exception {
// remove from the real mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
Expand Down Expand Up @@ -58,6 +55,8 @@ public class DefaultHttpClient implements HttpClient {
private Handler<Throwable> exceptionHandler;
private int port = 80;
private String host = "localhost";
private boolean tryUseCompression;

private final HttpPool pool = new PriorityHttpConnectionPool() {
protected void connect(Handler<ClientConnection> connectHandler, Handler<Throwable> connectErrorHandler, DefaultContext context) {
internalConnect(connectHandler, connectErrorHandler);
Expand Down Expand Up @@ -486,6 +485,18 @@ public boolean isUsePooledBuffers() {
return tcpHelper.isUsePooledBuffers();
}

@Override
public HttpClient setTryUseCompression(boolean tryUseCompression) {
checkClosed();
this.tryUseCompression = tryUseCompression;
return this;
}

@Override
public boolean getTryUseCompression() {
return tryUseCompression;
}

void getConnection(Handler<ClientConnection> handler, Handler<Throwable> connectionExceptionHandler, DefaultContext context) {
pool.getConnection(handler, connectionExceptionHandler, context);
}
Expand Down Expand Up @@ -538,6 +549,9 @@ protected void initChannel(Channel ch) throws Exception {
}

pipeline.addLast("codec", new HttpClientCodec());
if (tryUseCompression) {
pipeline.addLast("inflater", new HttpContentDecompressor());
}
pipeline.addLast("handler", new ClientHandler());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class DefaultHttpServer implements HttpServer, Closeable {
private ChannelGroup serverChannelGroup;
private boolean listening;
private String serverOrigin;
private boolean compressionSupported;

private ChannelFuture bindFuture;
private ServerID id;
Expand Down Expand Up @@ -185,7 +186,10 @@ protected void initChannel(Channel ch) throws Exception {
pipeline.addLast("flashpolicy", new FlashPolicyHandler());
pipeline.addLast("httpDecoder", new HttpRequestDecoder());
pipeline.addLast("httpEncoder", new HttpResponseEncoder());
if (tcpHelper.isSSL()) {
if (compressionSupported) {
pipeline.addLast("deflater", new HttpChunkContentCompressor());
}
if (tcpHelper.isSSL() || compressionSupported) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
Expand Down Expand Up @@ -490,6 +494,18 @@ public boolean isUsePooledBuffers() {
return tcpHelper.isUsePooledBuffers();
}

@Override
public HttpServer setCompressionSupported(boolean compressionSupported) {
checkListening();
this.compressionSupported = compressionSupported;
return this;
}

@Override
public boolean isCompressionSupported() {
return compressionSupported;
}

private void actualClose(final DefaultContext closeContext, final Handler<AsyncResult<Void>> done) {
if (id != null) {
vertx.sharedHttpServers().remove(id);
Expand Down Expand Up @@ -710,6 +726,11 @@ public void run() {
firstHandler = wsHandler;
}
} else {
ChannelHandler handler = ctx.pipeline().get(HttpChunkContentCompressor.class);
if (handler != null) {
// remove compressor as its not needed anymore once connection was upgraded to websockets
ctx.pipeline().remove(handler);
}
ws.connectNow();
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.vertx.java.core.http.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContentCompressor;


/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
final class HttpChunkContentCompressor extends HttpContentCompressor {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf) {
// convert ByteBuf to HttpContent to make it work with compression. This is needed as we use the
// ChunkedWriteHandler to send files when compression is enabled.
msg = new DefaultHttpContent((ByteBuf) msg);
}
super.write(ctx, msg, promise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package org.vertx.java.core.http.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.*;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
Expand Down Expand Up @@ -135,10 +132,17 @@ NetSocket createNetSocket() {
endReadAndFlush();

// remove old http handlers and replace the old handler with one that handle plain sockets
channel.pipeline().remove("httpDecoder");
if (channel.pipeline().get("chunkedWriter") != null) {
channel.pipeline().remove("chunkedWriter");
ChannelPipeline pipeline = channel.pipeline();
ChannelHandler compressor = pipeline.get(HttpChunkContentCompressor.class);
if (compressor != null) {
pipeline.remove(compressor);
}

pipeline.remove("httpDecoder");
if (pipeline.get("chunkedWriter") != null) {
pipeline.remove("chunkedWriter");
}

channel.pipeline().replace("handler", "handler", new VertxNetHandler(server.vertx, connectionMap) {
@Override
public void exceptionCaught(ChannelHandlerContext chctx, Throwable t) throws Exception {
Expand Down Expand Up @@ -281,8 +285,9 @@ protected void addFuture(Handler<AsyncResult<Void>> doneHandler, ChannelFuture f
super.addFuture(doneHandler, future);
}

protected boolean isSSL() {
return super.isSSL();
@Override
protected boolean supportsFileRegion() {
return super.supportsFileRegion() && channel.pipeline().get(HttpChunkContentCompressor.class) == null;
}

protected ChannelFuture sendFile(File file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/

public abstract class VertxHttpHandler<C extends ConnectionBase> extends VertxHandler<C> {
private final VertxInternal vertx;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ protected void handleHandlerException(Throwable t) {
vertx.reportException(t);
}

protected boolean isSSL() {
protected boolean supportsFileRegion() {
return !isSSL();
}

private boolean isSSL() {
return channel.pipeline().get(SslHandler.class) != null;
}

Expand All @@ -183,8 +187,8 @@ protected ChannelFuture sendFile(File file) {

// Write the content.
ChannelFuture writeFuture;
if (isSSL()) {
// Cannot use zero-copy with HTTPS.
if (!supportsFileRegion()) {
// Cannot use zero-copy
writeFuture = write(new ChunkedFile(raf, 0, fileLength, 8192));
} else {
// No encryption - use zero-copy.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.vertx.java.tests.core.http;

import vertx.tests.core.http.HttpCompressionTestClient;

/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
public class JavaHttpCompressionTest extends JavaHttpTest {

@Override
protected void startApp() throws Exception {
startApp(HttpCompressionTestClient.class.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class JavaHttpTest extends TestBase {
@Override
protected void setUp() throws Exception {
super.setUp();
startApp();
}

protected void startApp() throws Exception {
startApp(HttpTestClient.class.getName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vertx.tests.core.http;


/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
public class HttpCompressionTestClient extends HttpTestClient {

@Override
protected boolean compression() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,16 @@ public class HttpTestClient extends TestClientBase {
private HttpClient client;
private HttpServer server;

protected boolean compression() {
return false;
}

@Override
public void start() {
super.start();
tu.appReady();
client = vertx.createHttpClient().setHost("localhost").setPort(8080);
client.setTryUseCompression(compression());
}

@Override
Expand All @@ -65,6 +70,7 @@ public void handle(AsyncResult<Void> result) {

private void startServer(Handler<HttpServerRequest> serverHandler, AsyncResultHandler<HttpServer> handler) {
server = vertx.createHttpServer();
server.setCompressionSupported(compression());
server.requestHandler(serverHandler);
server.listen(8080, "localhost", handler);
}
Expand Down

0 comments on commit 633ccaa

Please sign in to comment.