Skip to content

Commit

Permalink
Decouple HttpClientRequest/HttpClientResponse from http 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 6201a0d commit 61a840e
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 82 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/http/HttpClientRequest.java
Expand Up @@ -86,6 +86,11 @@ public interface HttpClientRequest extends WriteStream<Buffer>, ReadStream<HttpC
@Override @Override
HttpClientRequest endHandler(Handler<Void> endHandler); HttpClientRequest endHandler(Handler<Void> endHandler);


HttpVersion getVersion();

@Fluent
HttpClientRequest setVersion(HttpVersion version);

/** /**
* If chunked is true then the request will be set into HTTP chunked mode * If chunked is true then the request will be set into HTTP chunked mode
* *
Expand Down
90 changes: 80 additions & 10 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -16,16 +16,18 @@


package io.vertx.core.http.impl; package io.vertx.core.http.impl;


import io.netty.buffer.ByteBuf;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.MultiMap; import io.vertx.core.MultiMap;
import io.vertx.core.VertxException; import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocket; import io.vertx.core.http.*;
import io.vertx.core.http.WebsocketVersion; import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
Expand All @@ -43,6 +45,14 @@
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;


import static io.vertx.core.http.HttpHeaders.ACCEPT_ENCODING;
import static io.vertx.core.http.HttpHeaders.CLOSE;
import static io.vertx.core.http.HttpHeaders.CONNECTION;
import static io.vertx.core.http.HttpHeaders.DEFLATE_GZIP;
import static io.vertx.core.http.HttpHeaders.HOST;
import static io.vertx.core.http.HttpHeaders.KEEP_ALIVE;
import static io.vertx.core.http.HttpHeaders.TRANSFER_ENCODING;

/** /**
* *
* This class is optimised for performance when used on the same event loop. However it can be used safely from other threads. * This class is optimised for performance when used on the same event loop. However it can be used safely from other threads.
Expand All @@ -52,7 +62,7 @@
* *
* @author <a href="http://tfox.org">Tim Fox</a> * @author <a href="http://tfox.org">Tim Fox</a>
*/ */
class ClientConnection extends ConnectionBase { class ClientConnection extends ConnectionBase implements HttpClientConnection {


private static final Logger log = LoggerFactory.getLogger(ClientConnection.class); private static final Logger log = LoggerFactory.getLogger(ClientConnection.class);


Expand Down Expand Up @@ -288,18 +298,18 @@ void handleResponseEnd(LastHttpContent trailer) {


// We don't signal response end for a 100-continue response as a real response will follow // We don't signal response end for a 100-continue response as a real response will follow
// Also we keep the connection open for an HTTP CONNECT // Also we keep the connection open for an HTTP CONNECT
if (currentResponse.statusCode() != 100 && requestForResponse.getRequest().getMethod() != HttpMethod.CONNECT) { if (currentResponse.statusCode() != 100 && requestForResponse.getMethod() != io.vertx.core.http.HttpMethod.CONNECT) {


boolean close = false; boolean close = false;
// See https://tools.ietf.org/html/rfc7230#section-6.3 // See https://tools.ietf.org/html/rfc7230#section-6.3
String responseConnectionHeader = currentResponse.getHeader(HttpHeaders.Names.CONNECTION); String responseConnectionHeader = currentResponse.getHeader(HttpHeaders.Names.CONNECTION);
HttpVersion protocolVersion = requestForResponse.getRequest().getProtocolVersion(); io.vertx.core.http.HttpVersion protocolVersion = requestForResponse.getVersion();
String requestConnectionHeader = requestForResponse.getRequest().headers().get(HttpHeaders.Names.CONNECTION); String requestConnectionHeader = requestForResponse.headers().get(HttpHeaders.Names.CONNECTION);
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true // We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
if (HttpHeaders.Values.CLOSE.equalsIgnoreCase(responseConnectionHeader) || HttpHeaders.Values.CLOSE.equalsIgnoreCase(requestConnectionHeader)) { if (HttpHeaders.Values.CLOSE.equalsIgnoreCase(responseConnectionHeader) || HttpHeaders.Values.CLOSE.equalsIgnoreCase(requestConnectionHeader)) {
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent // In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
close = true; close = true;
} else if (protocolVersion == HttpVersion.HTTP_1_0 && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(responseConnectionHeader)) { } else if (protocolVersion == io.vertx.core.http.HttpVersion.HTTP_1_0 && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(responseConnectionHeader)) {
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent // In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
// currently Vertx forces the Connection header if keepalive is enabled for 1.0 // currently Vertx forces the Connection header if keepalive is enabled for 1.0
close = true; close = true;
Expand Down Expand Up @@ -332,10 +342,70 @@ protected synchronized void handleClosed() {
} }
} }


protected ContextImpl getContext() { public ContextImpl getContext() {
return super.getContext(); return super.getContext();
} }


private HttpRequest createRequest(HttpVersion version, io.vertx.core.http.HttpMethod method, String uri, MultiMap headers) {
DefaultHttpRequest request = new DefaultHttpRequest(UriUtils.toNettyHttpVersion(version), UriUtils.toNettyHttpMethod(method), uri, false);
if (headers != null) {
for (Map.Entry<String, String> header : headers) {
// Todo : multi valued headers
request.headers().add(header.getKey(), header.getValue());
}
}
return request;
}

private void prepareHeaders(HttpRequest request, boolean chunked) {
HttpHeaders headers = request.headers();
headers.remove(TRANSFER_ENCODING);
if (!headers.contains(HOST)) {
request.headers().set(HOST, hostHeader());
}
if (chunked) {
HttpHeaders.setTransferEncodingChunked(request);
}
if (client.getOptions().isTryUseCompression() && request.headers().get(ACCEPT_ENCODING) == null) {
// if compression should be used but nothing is specified by the user support deflate and gzip.
request.headers().set(ACCEPT_ENCODING, DEFLATE_GZIP);
}
if (!client.getOptions().isKeepAlive() && client.getOptions().getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_1) {
request.headers().set(CONNECTION, CLOSE);
} else if (client.getOptions().isKeepAlive() && client.getOptions().getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_0) {
request.headers().set(CONNECTION, KEEP_ALIVE);
}
}

public void writeHead(HttpVersion version, io.vertx.core.http.HttpMethod method, String uri, MultiMap headers, boolean chunked) {
HttpRequest request = createRequest(version, method, uri, headers);
prepareHeaders(request, chunked);
writeToChannel(request);
}

public void writeHeadWithContent(io.vertx.core.http.HttpVersion version, io.vertx.core.http.HttpMethod method, String uri, MultiMap headers, boolean chunked, ByteBuf buf, boolean end) {
HttpRequest request = createRequest(version, method, uri, headers);
prepareHeaders(request, chunked);
if (end) {
writeToChannel(new AssembledFullHttpRequest(request, buf));
} else {
writeToChannel(new AssembledHttpRequest(request, buf));
}
}

@Override
public void writeBuffer(ByteBuf buff, boolean end) {
if (end) {
if (buff.isReadable()) {
writeToChannel(new DefaultLastHttpContent(buff, false));
} else {
writeToChannel(LastHttpContent.EMPTY_LAST_CONTENT);
}
} else {
writeToChannel(new DefaultHttpContent(buff));
}
}

@Override @Override
protected synchronized void handleException(Throwable e) { protected synchronized void handleException(Throwable e) {
super.handleException(e); super.handleException(e);
Expand All @@ -354,7 +424,7 @@ synchronized void setCurrentRequest(HttpClientRequestImpl req) {
this.requests.add(req); this.requests.add(req);
} }


synchronized void endRequest() { public synchronized void endRequest() {
if (currentRequest == null) { if (currentRequest == null) {
throw new IllegalStateException("No write in progress"); throw new IllegalStateException("No write in progress");
} }
Expand All @@ -378,7 +448,7 @@ public synchronized void close() {
} }
} }


NetSocket createNetSocket() { public NetSocket createNetSocket() {
// connection was upgraded to raw TCP socket // connection was upgraded to raw TCP socket
NetSocketImpl socket = new NetSocketImpl(vertx, channel, context, client.getSslHelper(), true, metrics, metric); NetSocketImpl socket = new NetSocketImpl(vertx, channel, context, client.getSslHelper(), true, metrics, metric);
Map<Channel, NetSocketImpl> connectionMap = new HashMap<>(1); Map<Channel, NetSocketImpl> connectionMap = new HashMap<>(1);
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionManager.java
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.vertx.core.Handler;
import io.vertx.core.impl.ContextImpl;

import java.util.function.BooleanSupplier;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class Http2ConnectionManager {

public void getConnection(int port, String host, Handler<ClientConnection> handler, Handler<Throwable> connectionExceptionHandler,
ContextImpl context, BooleanSupplier canceled) {
connectionExceptionHandler.handle(new UnsupportedOperationException());
}

}
50 changes: 50 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientConnection.java
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.NetSocket;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
interface HttpClientConnection {

void writeHead(HttpVersion version, HttpMethod method, String uri, MultiMap headers, boolean chunked);
void writeHeadWithContent(HttpVersion vertsion, HttpMethod method, String uri, MultiMap headers, boolean chunked, ByteBuf buf, boolean end);
void writeBuffer(ByteBuf buf, boolean end);
String hostHeader();
Context getContext();
void doSetWriteQueueMaxSize(int size);
boolean isNotWritable();
void handleInterestedOpsChanged();
void endRequest();

void doPause();
void doResume();

// Try to remove that ?
void reportBytesWritten(long numberOfBytes);
void reportBytesRead(long s);

NetSocket createNetSocket();
}
11 changes: 9 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -28,6 +28,7 @@
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*; import io.vertx.core.http.*;
import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.Closeable; import io.vertx.core.Closeable;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class HttpClientImpl implements HttpClient, MetricsProvider {
private final Map<Channel, ClientConnection> connectionMap = new ConcurrentHashMap<>(); private final Map<Channel, ClientConnection> connectionMap = new ConcurrentHashMap<>();
private final ContextImpl creatingContext; private final ContextImpl creatingContext;
private final ConnectionManager pool; private final ConnectionManager pool;
private final Http2ConnectionManager pool2;
private final Closeable closeHook; private final Closeable closeHook;
private final SSLHelper sslHelper; private final SSLHelper sslHelper;
private final HttpClientMetrics metrics; private final HttpClientMetrics metrics;
Expand All @@ -93,6 +95,7 @@ protected void connect(String host, int port, Handler<ClientConnection> connectH
internalConnect(context, port, host, connectHandler, connectErrorHandler, listener); internalConnect(context, port, host, connectHandler, connectErrorHandler, listener);
} }
}; };
pool2 = new Http2ConnectionManager();
this.metrics = vertx.metricsSPI().createMetrics(this, options); this.metrics = vertx.metricsSPI().createMetrics(this, options);
} }


Expand Down Expand Up @@ -669,9 +672,13 @@ void getConnection(int port, String host, Handler<ClientConnection> handler, Han
pool.getConnection(port, host, handler, connectionExceptionHandler, context, () -> false); pool.getConnection(port, host, handler, connectionExceptionHandler, context, () -> false);
} }


void getConnection(int port, String host, Handler<ClientConnection> handler, Handler<Throwable> connectionExceptionHandler, void getConnection(HttpVersion version, int port, String host, Handler<ClientConnection> handler, Handler<Throwable> connectionExceptionHandler,
ContextImpl context, BooleanSupplier canceled) { ContextImpl context, BooleanSupplier canceled) {
pool.getConnection(port, host, handler, connectionExceptionHandler, context, canceled); if (version == HttpVersion.HTTP_2) {
pool2.getConnection(port, host, handler, connectionExceptionHandler, context, canceled);
} else {
pool.getConnection(port, host, handler, connectionExceptionHandler, context, canceled);
}
} }


/** /**
Expand Down

0 comments on commit 61a840e

Please sign in to comment.