Skip to content

Commit

Permalink
Support domain socket for HTTP client
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 1, 2019
1 parent 64ee027 commit f38c2ec
Show file tree
Hide file tree
Showing 24 changed files with 1,025 additions and 857 deletions.
2 changes: 1 addition & 1 deletion src/main/asciidoc/http.adoc
Expand Up @@ -1550,7 +1550,7 @@ method.


The {@link io.vertx.core.http.HttpClientOptions#setSsl(boolean)} setting acts as the default client setting. The {@link io.vertx.core.http.HttpClientOptions#setSsl(boolean)} setting acts as the default client setting.


The {@link io.vertx.core.http.RequestOptions#setSsl(boolean)} overrides the default client setting The {@link io.vertx.core.http.RequestOptions#setSsl(Boolean)} overrides the default client setting


* setting the value to `false` will disable SSL/TLS even if the client is configured to use SSL/TLS * setting the value to `false` will disable SSL/TLS even if the client is configured to use SSL/TLS
* setting the value to `true` will enable SSL/TLS even if the client is configured to not use SSL/TLS, the actual * setting the value to `true` will enable SSL/TLS even if the client is configured to not use SSL/TLS, the actual
Expand Down
19 changes: 12 additions & 7 deletions src/main/asciidoc/index.adoc
Expand Up @@ -1449,28 +1449,33 @@ Native on BSD gives you extra networking options:


=== Domain sockets === Domain sockets


Natives provide domain sockets support for `NetServer` and `HttpServer`: Natives provide domain sockets support for servers:


[source,$lang] [source,$lang]
---- ----
{@link examples.CoreExamples#serverWithDomainSockets} {@link examples.CoreExamples#tcpServerWithDomainSockets}
---- ----


Or for http: or for http:

[source,$lang] [source,$lang]
---- ----
{@link examples.CoreExamples#httpServerWithDomainSockets} {@link examples.CoreExamples#httpServerWithDomainSockets}
---- ----



As well as clients:
As well as `NetClient`:


[source,$lang] [source,$lang]
---- ----
{@link examples.CoreExamples#clientWithDomainSockets} {@link examples.CoreExamples#tcpClientWithDomainSockets}
---- ----


NOTE: support for `HttpClient` can be expected in later versions of Vert.x or for http:

[source,$lang]
----
{@link examples.CoreExamples#httpClientWithDomainSockets}
----


== Security notes == Security notes


Expand Down
23 changes: 20 additions & 3 deletions src/main/java/examples/CoreExamples.java
Expand Up @@ -15,6 +15,8 @@
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.dns.AddressResolverOptions; import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.file.FileSystem; import io.vertx.core.file.FileSystem;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerRequest;
Expand Down Expand Up @@ -392,7 +394,7 @@ public void configureBSDOptions(Vertx vertx, boolean reusePort) {
vertx.createHttpServer(new HttpServerOptions().setReusePort(reusePort)); vertx.createHttpServer(new HttpServerOptions().setReusePort(reusePort));
} }


public void serverWithDomainSockets(Vertx vertx) { public void tcpServerWithDomainSockets(Vertx vertx) {
// Only available on BSD and Linux // Only available on BSD and Linux
vertx.createNetServer().connectHandler(so -> { vertx.createNetServer().connectHandler(so -> {
// Handle application // Handle application
Expand All @@ -411,16 +413,31 @@ public void httpServerWithDomainSockets(Vertx vertx) {
}); });
} }


public void clientWithDomainSockets(Vertx vertx) { public void tcpClientWithDomainSockets(Vertx vertx) {
NetClient netClient = vertx.createNetClient(); NetClient netClient = vertx.createNetClient();


// Only available on BSD and Linux // Only available on BSD and Linux
netClient.connect(SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"), ar -> { SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");

// Connect to the server
netClient.connect(addr, ar -> {
if (ar.succeeded()) { if (ar.succeeded()) {
// Connected // Connected
} else { } else {
ar.cause().printStackTrace(); ar.cause().printStackTrace();
} }
}); });
} }

public void httpClientWithDomainSockets(Vertx vertx) {
HttpClient httpClient = vertx.createHttpClient();

// Only available on BSD and Linux
SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");

// Send request to the server
httpClient.request(HttpMethod.GET, addr, 8080, "localhost", "/", resp -> {
// Process response
}).end();
}
} }
61 changes: 61 additions & 0 deletions src/main/java/io/vertx/core/http/HttpClient.java
Expand Up @@ -19,6 +19,7 @@
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.MultiMap; import io.vertx.core.MultiMap;
import io.vertx.core.metrics.Measured; import io.vertx.core.metrics.Measured;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.ReadStream;


import java.util.function.Function; import java.util.function.Function;
Expand Down Expand Up @@ -54,6 +55,16 @@
@VertxGen @VertxGen
public interface HttpClient extends Measured { public interface HttpClient extends Measured {


/**
* Like {@link #request(HttpMethod, RequestOptions)} using the {@code serverAddress} parameter to connect to the
* server instead of the {@code absoluteURI} parameter.
* <p>
* The request host header will still be created from the {@code options} parameter.
* <p>
* Use {@link SocketAddress#domainSocketAddress(String)} to connect to a unix domain socket server.
*/
HttpClientRequest request(HttpMethod method, SocketAddress serverAddress, RequestOptions options);

/** /**
* Create an HTTP request to send to the server with the specified options. * Create an HTTP request to send to the server with the specified options.
* *
Expand All @@ -73,6 +84,16 @@ public interface HttpClient extends Measured {
*/ */
HttpClientRequest request(HttpMethod method, int port, String host, String requestURI); HttpClientRequest request(HttpMethod method, int port, String host, String requestURI);


/**
* Like {@link #request(HttpMethod, int, String, String)} using the {@code serverAddress} parameter to connect to the
* server instead of the {@code absoluteURI} parameter.
* <p>
* The request host header will still be created from the {@code host} and {@code port} parameters.
* <p>
* Use {@link SocketAddress#domainSocketAddress(String)} to connect to a unix domain socket server.
*/
HttpClientRequest request(HttpMethod method, SocketAddress serverAddress, int port, String host, String requestURI);

/** /**
* Create an HTTP request to send to the server at the specified host and default port. * Create an HTTP request to send to the server at the specified host and default port.
* @param method the HTTP method * @param method the HTTP method
Expand All @@ -91,6 +112,16 @@ public interface HttpClient extends Measured {
*/ */
HttpClientRequest request(HttpMethod method, RequestOptions options, Handler<AsyncResult<HttpClientResponse>> responseHandler); HttpClientRequest request(HttpMethod method, RequestOptions options, Handler<AsyncResult<HttpClientResponse>> responseHandler);


/**
* Like {@link #request(HttpMethod, RequestOptions, Handler)} using the {@code serverAddress} parameter to connect to the
* server instead of the {@code absoluteURI} parameter.
* <p>
* The request host header will still be created from the {@code options} parameter.
* <p>
* Use {@link SocketAddress#domainSocketAddress(String)} to connect to a unix domain socket server.
*/
HttpClientRequest request(HttpMethod method, SocketAddress serverAddress, RequestOptions options, Handler<AsyncResult<HttpClientResponse>> responseHandler);

/** /**
* Create an HTTP request to send to the server at the specified host and port, specifying a response handler to receive * Create an HTTP request to send to the server at the specified host and port, specifying a response handler to receive
* the response * the response
Expand All @@ -103,6 +134,16 @@ public interface HttpClient extends Measured {
*/ */
HttpClientRequest request(HttpMethod method, int port, String host, String requestURI, Handler<AsyncResult<HttpClientResponse>> responseHandler); HttpClientRequest request(HttpMethod method, int port, String host, String requestURI, Handler<AsyncResult<HttpClientResponse>> responseHandler);


/**
* Like {@link #request(HttpMethod, int, String, String, Handler)} using the {@code serverAddress} parameter to connect to the
* server instead of the {@code absoluteURI} parameter.
* <p>
* The request host header will still be created from the {@code host} and {@code port} parameters.
* <p>
* Use {@link SocketAddress#domainSocketAddress(String)} to connect to a unix domain socket server.
*/
HttpClientRequest request(HttpMethod method, SocketAddress serverAddress, int port, String host, String requestURI, Handler<AsyncResult<HttpClientResponse>> responseHandler);

/** /**
* Create an HTTP request to send to the server at the specified host and default port, specifying a response handler to receive * Create an HTTP request to send to the server at the specified host and default port, specifying a response handler to receive
* the response * the response
Expand Down Expand Up @@ -140,6 +181,16 @@ public interface HttpClient extends Measured {
*/ */
HttpClientRequest requestAbs(HttpMethod method, String absoluteURI); HttpClientRequest requestAbs(HttpMethod method, String absoluteURI);


/**
* Like {@link #requestAbs(HttpMethod, String)} using the {@code serverAddress} parameter to connect to the
* server instead of the {@code absoluteURI} parameter.
* <p>
* The request host header will still be created from the {@code absoluteURI} parameter.
* <p>
* Use {@link SocketAddress#domainSocketAddress(String)} to connect to a unix domain socket server.
*/
HttpClientRequest requestAbs(HttpMethod method, SocketAddress serverAddress, String absoluteURI);

/** /**
* Create an HTTP request to send to the server using an absolute URI, specifying a response handler to receive * Create an HTTP request to send to the server using an absolute URI, specifying a response handler to receive
* the response * the response
Expand All @@ -150,6 +201,16 @@ public interface HttpClient extends Measured {
*/ */
HttpClientRequest requestAbs(HttpMethod method, String absoluteURI, Handler<AsyncResult<HttpClientResponse>> responseHandler); HttpClientRequest requestAbs(HttpMethod method, String absoluteURI, Handler<AsyncResult<HttpClientResponse>> responseHandler);


/**
* Like {@link #requestAbs(HttpMethod, String, Handler)} using the {@code serverAddress} parameter to connect to the
* server instead of the {@code absoluteURI} parameter.
* <p>
* The request host header will still be created from the {@code absoluteURI} parameter.
* <p>
* Use {@link SocketAddress#domainSocketAddress(String)} to connect to a unix domain socket server.
*/
HttpClientRequest requestAbs(HttpMethod method, SocketAddress serverAddress, String absoluteURI, Handler<AsyncResult<HttpClientResponse>> responseHandler);

/** /**
* Create an HTTP GET request to send to the server with the specified options. * Create an HTTP GET request to send to the server with the specified options.
* @param options the request options * @param options the request options
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/vertx/core/http/RequestOptions.java
Expand Up @@ -35,7 +35,7 @@ public class RequestOptions {
/** /**
* SSL enabled by default = false * SSL enabled by default = false
*/ */
public static final boolean DEFAULT_SSL = false; public static final Boolean DEFAULT_SSL = null;


/** /**
* The default relative request URI = "" * The default relative request URI = ""
Expand All @@ -44,7 +44,7 @@ public class RequestOptions {


private String host; private String host;
private int port; private int port;
private boolean ssl; private Boolean ssl;
private String uri; private String uri;


/** /**
Expand Down Expand Up @@ -123,7 +123,7 @@ public RequestOptions setPort(int port) {
/** /**
* @return is SSL/TLS enabled? * @return is SSL/TLS enabled?
*/ */
public boolean isSsl() { public Boolean isSsl() {
return ssl; return ssl;
} }


Expand All @@ -133,7 +133,7 @@ public boolean isSsl() {
* @param ssl true if enabled * @param ssl true if enabled
* @return a reference to this, so the API can be used fluently * @return a reference to this, so the API can be used fluently
*/ */
public RequestOptions setSsl(boolean ssl) { public RequestOptions setSsl(Boolean ssl) {
this.ssl = ssl; this.ssl = ssl;
return this; return this;
} }
Expand Down
41 changes: 24 additions & 17 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -18,6 +18,7 @@
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.Pool; import io.vertx.core.http.impl.pool.Pool;
import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.HttpClientMetrics; import io.vertx.core.spi.metrics.HttpClientMetrics;


import java.util.*; import java.util.*;
Expand Down Expand Up @@ -66,37 +67,34 @@ private synchronized void checkExpired(long period) {
private static final class EndpointKey { private static final class EndpointKey {


private final boolean ssl; private final boolean ssl;
private final int port; private final SocketAddress server;
private final String peerHost; private final SocketAddress peerAddress;
private final String host;


EndpointKey(boolean ssl, int port, String peerHost, String host) { EndpointKey(boolean ssl, SocketAddress server, SocketAddress peerAddress) {
if (host == null) { if (server == null) {
throw new NullPointerException("No null host"); throw new NullPointerException("No null host");
} }
if (peerHost == null) { if (peerAddress == null) {
throw new NullPointerException("No null peer host"); throw new NullPointerException("No null peer address");
} }
this.ssl = ssl; this.ssl = ssl;
this.peerHost = peerHost; this.peerAddress = peerAddress;
this.host = host; this.server = server;
this.port = port;
} }


@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
EndpointKey that = (EndpointKey) o; EndpointKey that = (EndpointKey) o;
return ssl == that.ssl && port == that.port && peerHost.equals(that.peerHost) && host.equals(that.host); return ssl == that.ssl && server.equals(that.server) && peerAddress.equals(that.peerAddress);
} }


@Override @Override
public int hashCode() { public int hashCode() {
int result = ssl ? 1 : 0; int result = ssl ? 1 : 0;
result = 31 * result + peerHost.hashCode(); result = 31 * result + peerAddress.hashCode();
result = 31 * result + host.hashCode(); result = 31 * result + server.hashCode();
result = 31 * result + port;
return result; return result;
} }
} }
Expand All @@ -112,13 +110,22 @@ public Endpoint(Pool<HttpClientConnection> pool, Object metric) {
} }
} }


void getConnection(ContextInternal ctx, String peerHost, boolean ssl, int port, String host, Handler<AsyncResult<HttpClientConnection>> handler) { void getConnection(ContextInternal ctx, SocketAddress peerAddress, boolean ssl, SocketAddress server, Handler<AsyncResult<HttpClientConnection>> handler) {
EndpointKey key = new EndpointKey(ssl, port, peerHost, host); EndpointKey key = new EndpointKey(ssl, server, peerAddress);
while (true) { while (true) {
Endpoint endpoint = endpointMap.computeIfAbsent(key, targetAddress -> { Endpoint endpoint = endpointMap.computeIfAbsent(key, targetAddress -> {
int maxPoolSize = Math.max(client.getOptions().getMaxPoolSize(), client.getOptions().getHttp2MaxPoolSize()); int maxPoolSize = Math.max(client.getOptions().getMaxPoolSize(), client.getOptions().getHttp2MaxPoolSize());
String host;
int port;
if (server.path() == null) {
host = server.host();
port = server.port();
} else {
host = server.path();
port = 0;
}
Object metric = metrics != null ? metrics.createEndpoint(host, port, maxPoolSize) : null; Object metric = metrics != null ? metrics.createEndpoint(host, port, maxPoolSize) : null;
HttpChannelConnector connector = new HttpChannelConnector(client, metric, version, ssl, peerHost, host, port); HttpChannelConnector connector = new HttpChannelConnector(client, metric, version, ssl, peerAddress, server);
Pool<HttpClientConnection> pool = new Pool<>(ctx, connector, maxWaitQueueSize, connector.weight(), maxSize, Pool<HttpClientConnection> pool = new Pool<>(ctx, connector, maxWaitQueueSize, connector.weight(), maxSize,
v -> { v -> {
if (metrics != null) { if (metrics != null) {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase; import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.NetSocketImpl; import io.vertx.core.net.impl.NetSocketImpl;
import io.vertx.core.net.impl.VertxHandler; import io.vertx.core.net.impl.VertxHandler;
Expand Down Expand Up @@ -67,8 +68,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
private final HttpClientImpl client; private final HttpClientImpl client;
private final HttpClientOptions options; private final HttpClientOptions options;
private final boolean ssl; private final boolean ssl;
private final String host; private final SocketAddress server;
private final int port;
private final Object endpointMetric; private final Object endpointMetric;
private final HttpClientMetrics metrics; private final HttpClientMetrics metrics;
private final HttpVersion version; private final HttpVersion version;
Expand All @@ -89,17 +89,15 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
Object endpointMetric, Object endpointMetric,
ChannelHandlerContext channel, ChannelHandlerContext channel,
boolean ssl, boolean ssl,
String host, SocketAddress server,
int port,
ContextInternal context, ContextInternal context,
HttpClientMetrics metrics) { HttpClientMetrics metrics) {
super(client.getVertx(), channel, context); super(client.getVertx(), channel, context);
this.listener = listener; this.listener = listener;
this.client = client; this.client = client;
this.options = client.getOptions(); this.options = client.getOptions();
this.ssl = ssl; this.ssl = ssl;
this.host = host; this.server = server;
this.port = port;
this.metrics = metrics; this.metrics = metrics;
this.version = version; this.version = version;
this.endpointMetric = endpointMetric; this.endpointMetric = endpointMetric;
Expand Down Expand Up @@ -678,7 +676,7 @@ synchronized void toWebSocket(String requestURI, MultiMap headers, WebsocketVers
URI wsuri = new URI(requestURI); URI wsuri = new URI(requestURI);
if (!wsuri.isAbsolute()) { if (!wsuri.isAbsolute()) {
// Netty requires an absolute url // Netty requires an absolute url
wsuri = new URI((ssl ? "https:" : "http:") + "//" + host + ":" + port + requestURI); wsuri = new URI((ssl ? "https:" : "http:") + "//" + server.host() + ":" + server.port() + requestURI);
} }
WebSocketVersion version = WebSocketVersion version =
WebSocketVersion.valueOf((vers == null ? WebSocketVersion.valueOf((vers == null ?
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -174,10 +174,19 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream
String rawMethod = headers.method().toString(); String rawMethod = headers.method().toString();
HttpMethod method = HttpUtils.toVertxMethod(rawMethod); HttpMethod method = HttpUtils.toVertxMethod(rawMethod);
String uri = headers.path().toString(); String uri = headers.path().toString();
String host = headers.authority() != null ? headers.authority().toString() : null; String authority = headers.authority() != null ? headers.authority().toString() : null;
MultiMap headersMap = new Http2HeadersAdaptor(headers); MultiMap headersMap = new Http2HeadersAdaptor(headers);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId); Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
int port = remoteAddress().port(); int pos = authority.indexOf(':');
int port;
String host;
if (pos == -1) {
host = authority;
port = 80;
} else {
host = authority.substring(0, pos);
port = Integer.parseInt(authority.substring(pos + 1));
}
HttpClientRequestPushPromise pushReq = new HttpClientRequestPushPromise(this, promisedStream, client, isSsl(), method, rawMethod, uri, host, port, headersMap); HttpClientRequestPushPromise pushReq = new HttpClientRequestPushPromise(this, promisedStream, client, isSsl(), method, rawMethod, uri, host, port, headersMap);
if (metrics != null) { if (metrics != null) {
pushReq.metric(metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq)); pushReq.metric(metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq));
Expand Down

0 comments on commit f38c2ec

Please sign in to comment.