Skip to content

Commit

Permalink
Cleanup and doc
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 10, 2017
1 parent b664aa4 commit c7881d0
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 87 deletions.
Expand Up @@ -56,8 +56,7 @@ public Http2ClientConnection(ConnectionListener<HttpClientConnection> listener,
HttpClientImpl client, HttpClientImpl client,
ContextImpl context, ContextImpl context,
VertxHttp2ConnectionHandler connHandler, VertxHttp2ConnectionHandler connHandler,
HttpClientMetrics metrics, HttpClientMetrics metrics) {
Handler<AsyncResult<HttpClientConnection>> resultHandler) {
super(context, connHandler); super(context, connHandler);
this.metrics = metrics; this.metrics = metrics;
this.queueMetric = queueMetric; this.queueMetric = queueMetric;
Expand Down
106 changes: 50 additions & 56 deletions src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java
Expand Up @@ -42,10 +42,7 @@
import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLHandshakeException;


/** /**
* The ChannelConnector performs the channel configuration and connection according to the * Performs the channel configuration and connection according to the client options and the protocol version.
* client options and the protocol version.
* When the channel connects or fails to connect, it calls back the ConnQueue that initiated the
* connection.
*/ */
class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> { class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {


Expand Down Expand Up @@ -239,7 +236,7 @@ private void handshakeFailure(Channel ch, Throwable cause, Handler<AsyncResult<H
connectFailed(ch, handler, sslException); connectFailed(ch, handler, sslException);
} }


private void http1xConnected(ConnectionListener<HttpClientConnection> queue, private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
HttpVersion version, HttpVersion version,
String host, String host,
int port, int port,
Expand All @@ -248,60 +245,57 @@ private void http1xConnected(ConnectionListener<HttpClientConnection> queue,
ContextImpl context, ContextImpl context,
Channel ch, Channel ch,
Handler<AsyncResult<HttpClientConnection>> handler) { Handler<AsyncResult<HttpClientConnection>> handler) {
synchronized (this) { ClientHandler clientHandler = new ClientHandler(
ClientHandler clientHandler = new ClientHandler( listener,
queue, context,
context, version,
version, host,
host, port,
port, ssl,
ssl, client,
client, endpointMetric,
endpointMetric, client.metrics());
client.metrics()); clientHandler.addHandler(conn -> {
clientHandler.addHandler(conn -> { handler.handle(Future.succeededFuture(conn));
handler.handle(Future.succeededFuture(conn)); });
}); clientHandler.removeHandler(conn -> {
clientHandler.removeHandler(conn -> { listener.onClose(conn, ch);
queue.onClose(conn, ch); });
}); ch.pipeline().addLast("handler", clientHandler);
ch.pipeline().addLast("handler", clientHandler);
}
} }


private void http2Connected(ConnectionListener<HttpClientConnection> queue, Object endpointMetric, ContextImpl context, Channel ch, Handler<AsyncResult<HttpClientConnection>> resultHandler) { private void http2Connected(ConnectionListener<HttpClientConnection> listener,
Object endpointMetric,
ContextImpl context,
Channel ch,
Handler<AsyncResult<HttpClientConnection>> resultHandler) {
try { try {
synchronized (this) { boolean upgrade;
boolean upgrade; upgrade = ch.pipeline().get(SslHandler.class) == null && options.isHttp2ClearTextUpgrade();
upgrade = ch.pipeline().get(SslHandler.class) == null && options.isHttp2ClearTextUpgrade(); VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>(ch)
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>(ch) .server(false)
.server(false) .clientUpgrade(upgrade)
.clientUpgrade(upgrade) .useCompression(client.getOptions().isTryUseCompression())
.useCompression(client.getOptions().isTryUseCompression()) .initialSettings(client.getOptions().getInitialSettings())
.initialSettings(client.getOptions().getInitialSettings()) .connectionFactory(connHandler -> new Http2ClientConnection(listener, endpointMetric, client, context, connHandler, metrics))
.connectionFactory(connHandler -> { .logEnabled(options.getLogActivity())
Http2ClientConnection conn = new Http2ClientConnection(queue, endpointMetric, client, context, connHandler, metrics, resultHandler); .build();
return conn; handler.addHandler(conn -> {
}) if (options.getHttp2ConnectionWindowSize() > 0) {
.logEnabled(options.getLogActivity()) conn.setWindowSize(options.getHttp2ConnectionWindowSize());
.build(); }
handler.addHandler(conn -> { if (metrics != null) {
if (options.getHttp2ConnectionWindowSize() > 0) { Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.setWindowSize(options.getHttp2ConnectionWindowSize()); conn.metric(metric);
} }
if (metrics != null) { resultHandler.handle(Future.succeededFuture(conn));
Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName()); });
conn.metric(metric); handler.removeHandler(conn -> {
} if (metrics != null) {
resultHandler.handle(Future.succeededFuture(conn)); metrics.endpointDisconnected(endpointMetric, conn.metric());
}); }
handler.removeHandler(conn -> { listener.onClose(conn, ch);
if (metrics != null) { });
metrics.endpointDisconnected(endpointMetric, conn.metric());
}
queue.onClose(conn, ch);
});
}
} catch (Exception e) { } catch (Exception e) {
connectFailed(ch, resultHandler, e); connectFailed(ch, resultHandler, e);
} }
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -159,7 +159,7 @@ public HttpClientImpl(VertxInternal vertx, HttpClientOptions options) {
} }


ConnectionProvider<HttpClientConnection> connector = new HttpChannelConnector(this); ConnectionProvider<HttpClientConnection> connector = new HttpChannelConnector(this);
Function<SocketAddress, ConnectionPool<HttpClientConnection>> poolFactory = sa -> new HttpClientPool(options.getProtocolVersion(), options, sa.host(), sa.port()); Function<SocketAddress, ConnectionPool<HttpClientConnection>> poolFactory = sa -> new HttpConnectionPool(options.getProtocolVersion(), options);


wsCM = new ConnectionManager<>(vertx, metrics, connector, poolFactory, options.getMaxWaitQueueSize()); wsCM = new ConnectionManager<>(vertx, metrics, connector, poolFactory, options.getMaxWaitQueueSize());
httpCM = new ConnectionManager<>(vertx, metrics, connector, poolFactory, options.getMaxWaitQueueSize()); httpCM = new ConnectionManager<>(vertx, metrics, connector, poolFactory, options.getMaxWaitQueueSize());
Expand Down
Expand Up @@ -23,40 +23,33 @@


import java.util.*; import java.util.*;


class HttpClientPool implements ConnectionPool<HttpClientConnection> { /**
* The logic for the connection pool for HTTP:
*
* - HTTP/1.x pools several connections
* - HTTP/2 uses can multiplex on a single connections but can handle several connections
* <p/>
* When this pool is initialized with an HTTP/2 pool, this pool can be changed to an HTTP/1/1
* pool if the remote server does not support HTTP/2 or after ALPN negotiation.
* <p/>
* In this case further connections will be HTTP/1.1 connections, until the pool is closed.
*/
class HttpConnectionPool implements ConnectionPool<HttpClientConnection> {


private ConnectionPool<? extends HttpClientConnection> current; private ConnectionPool<? extends HttpClientConnection> current;
private final HttpClientOptions options; private final HttpClientOptions options;
private final String host;
private final int port;
private final boolean ssl;
private HttpVersion version; private HttpVersion version;


HttpClientPool(HttpVersion version, HttpClientOptions options, String host, int port) { HttpConnectionPool(HttpVersion version, HttpClientOptions options) {
this.version = version; this.version = version;
this.options = options; this.options = options;
this.host = host;
this.port = port;
this.ssl = options.isSsl();
if (version == HttpVersion.HTTP_2) { if (version == HttpVersion.HTTP_2) {
current = new Http2(); current = new Http2();
} else { } else {
current = new Http1x(); current = new Http1x();
} }
} }


String host() {
return host;
}

int port() {
return port;
}

boolean ssl() {
return ssl;
}

private void fallbackToHttp1x(HttpVersion version) { private void fallbackToHttp1x(HttpVersion version) {
this.current = new Http1x(); this.current = new Http1x();
this.version = version; this.version = version;
Expand Down
Expand Up @@ -17,6 +17,9 @@


import io.netty.channel.Channel; import io.netty.channel.Channel;


/**
* The listener is used by the {@link ConnectionProvider} to interact with the connection manager.
*/
public interface ConnectionListener<C> { public interface ConnectionListener<C> {


void onRecycle(C conn); void onRecycle(C conn);
Expand Down
Expand Up @@ -137,13 +137,6 @@ public void close() {


/** /**
* The connection queue delegates to the connection pool, the pooling strategy. * The connection queue delegates to the connection pool, the pooling strategy.
*
* - HTTP/1.x pools several connections
* - HTTP/2 uses a single connection
*
* After a queue is initialized with an HTTP/2 pool, this pool changed to an HTTP/1/1
* pool if the server does not support HTTP/2 or after negotiation. In this situation
* all waiters on this queue will use HTTP/1.1 connections.
*/ */
class ConnQueue implements ConnectionListener<C> { class ConnQueue implements ConnectionListener<C> {


Expand Down
Expand Up @@ -18,7 +18,7 @@
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;


/** /**
* The logic for the connection pool because HTTP/1 and HTTP/2 have different pooling logics. * The logic for the connection pool.
*/ */
public interface ConnectionPool<C> { public interface ConnectionPool<C> {


Expand Down
Expand Up @@ -21,6 +21,9 @@
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;


/**
* Provides how the connection manager interacts its connections.
*/
public interface ConnectionProvider<C> { public interface ConnectionProvider<C> {


void connect( void connect(
Expand Down

0 comments on commit c7881d0

Please sign in to comment.