Skip to content

Commit

Permalink
Don't add unusupported socket options for domain sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 1, 2018
1 parent a08cd8a commit ff304f0
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 46 deletions.
Expand Up @@ -129,7 +129,7 @@ private void doConnect(
bootstrap.group(context.nettyEventLoop()); bootstrap.group(context.nettyEventLoop());
bootstrap.channelFactory(client.getVertx().transport().channelFactory(false)); bootstrap.channelFactory(client.getVertx().transport().channelFactory(false));


applyConnectionOptions(bootstrap); applyConnectionOptions(false, bootstrap);


boolean useAlpn = options.isUseAlpn(); boolean useAlpn = options.isUseAlpn();


Expand Down Expand Up @@ -182,8 +182,8 @@ private void doConnect(
channelProvider.connect(SocketAddress.inetSocketAddress(port, host), SocketAddress.inetSocketAddress(port, peerHost), this.options.isForceSni() ? peerHost : null, channelHandler); channelProvider.connect(SocketAddress.inetSocketAddress(port, host), SocketAddress.inetSocketAddress(port, peerHost), this.options.isForceSni() ? peerHost : null, channelHandler);
} }


private void applyConnectionOptions(Bootstrap bootstrap) { private void applyConnectionOptions(boolean domainSocket, Bootstrap bootstrap) {
client.getVertx().transport().configure(options, bootstrap); client.getVertx().transport().configure(options, domainSocket, bootstrap);
} }


private void applyHttp2ConnectionOptions(ChannelPipeline pipeline) { private void applyHttp2ConnectionOptions(ChannelPipeline pipeline) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -219,7 +219,7 @@ public synchronized HttpServer listen(SocketAddress address, Handler<AsyncResult
serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE); serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers); bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);
applyConnectionOptions(bootstrap); applyConnectionOptions(address.path() != null, bootstrap);
sslHelper.validate(vertx); sslHelper.validate(vertx);
bootstrap.childHandler(new ChannelInitializer<Channel>() { bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
Expand Down Expand Up @@ -583,8 +583,8 @@ public SSLHelper getSslHelper() {
return sslHelper; return sslHelper;
} }


private void applyConnectionOptions(ServerBootstrap bootstrap) { private void applyConnectionOptions(boolean domainSocket, ServerBootstrap bootstrap) {
vertx.transport().configure(options, bootstrap); vertx.transport().configure(options, domainSocket, bootstrap);
} }




Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -15,7 +15,6 @@
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -149,8 +148,8 @@ private void checkClosed() {
} }
} }


private void applyConnectionOptions(Bootstrap bootstrap) { private void applyConnectionOptions(boolean domainSocket, Bootstrap bootstrap) {
vertx.transport().configure(options, bootstrap); vertx.transport().configure(options, domainSocket, bootstrap);
} }


@Override @Override
Expand Down Expand Up @@ -179,7 +178,7 @@ protected void doConnect(SocketAddress remoteAddress, String serverName, Handler
bootstrap.group(context.nettyEventLoop()); bootstrap.group(context.nettyEventLoop());
bootstrap.channelFactory(vertx.transport().channelFactory(remoteAddress.path() != null)); bootstrap.channelFactory(vertx.transport().channelFactory(remoteAddress.path() != null));


applyConnectionOptions(bootstrap); applyConnectionOptions(remoteAddress.path() != null, bootstrap);


ChannelProvider channelProvider = new ChannelProvider(bootstrap, sslHelper, sslHelper.isSSL(), context, options.getProxyOptions()); ChannelProvider channelProvider = new ChannelProvider(bootstrap, sslHelper, sslHelper.isSSL(), context, options.getProxyOptions());


Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/vertx/core/net/impl/NetServerImpl.java
Expand Up @@ -219,7 +219,7 @@ protected void initChannel(Channel ch) throws Exception {
} }
}); });


applyConnectionOptions(bootstrap); applyConnectionOptions(socketAddress.path() != null, bootstrap);


handlerManager.addHandler(new Handlers(handler, exceptionHandler), listenContext); handlerManager.addHandler(new Handlers(handler, exceptionHandler), listenContext);


Expand Down Expand Up @@ -467,10 +467,11 @@ private void executeCloseDone(ContextInternal closeContext, Handler<AsyncResult<
/** /**
* Apply the connection option to the server. * Apply the connection option to the server.
* *
* @param domainSocket whether it's a domain socket server
* @param bootstrap the Netty server bootstrap * @param bootstrap the Netty server bootstrap
*/ */
protected void applyConnectionOptions(ServerBootstrap bootstrap) { private void applyConnectionOptions(boolean domainSocket, ServerBootstrap bootstrap) {
vertx.transport().configure(options, bootstrap); vertx.transport().configure(options, domainSocket, bootstrap);
} }


@Override @Override
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/io/vertx/core/net/impl/transport/EpollTransport.java
Expand Up @@ -105,16 +105,16 @@ public DatagramChannel datagramChannel(InternetProtocolFamily family) {
} }


@Override @Override
public ChannelFactory<? extends Channel> channelFactory(boolean domain) { public ChannelFactory<? extends Channel> channelFactory(boolean domainSocket) {
if (domain) { if (domainSocket) {
return EpollDomainSocketChannel::new; return EpollDomainSocketChannel::new;
} else { } else {
return EpollSocketChannel::new; return EpollSocketChannel::new;
} }
} }


public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domain) { public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domainSocket) {
if (domain) { if (domainSocket) {
return EpollServerDomainSocketChannel::new; return EpollServerDomainSocketChannel::new;
} }
return EpollServerSocketChannel::new; return EpollServerSocketChannel::new;
Expand All @@ -127,23 +127,25 @@ public void configure(DatagramChannel channel, DatagramSocketOptions options) {
} }


@Override @Override
public void configure(NetServerOptions options, ServerBootstrap bootstrap) { public void configure(NetServerOptions options, boolean domainSocket, ServerBootstrap bootstrap) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, options.isReusePort()); if (!domainSocket) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, options.isReusePort());
}
if (options.isTcpFastOpen()) { if (options.isTcpFastOpen()) {
bootstrap.option(EpollChannelOption.TCP_FASTOPEN, options.isTcpFastOpen() ? pendingFastOpenRequestsThreshold : 0); bootstrap.option(EpollChannelOption.TCP_FASTOPEN, options.isTcpFastOpen() ? pendingFastOpenRequestsThreshold : 0);
} }
bootstrap.childOption(EpollChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); bootstrap.childOption(EpollChannelOption.TCP_QUICKACK, options.isTcpQuickAck());
bootstrap.childOption(EpollChannelOption.TCP_CORK, options.isTcpCork()); bootstrap.childOption(EpollChannelOption.TCP_CORK, options.isTcpCork());
super.configure(options, bootstrap); super.configure(options, domainSocket, bootstrap);
} }


@Override @Override
public void configure(ClientOptionsBase options, Bootstrap bootstrap) { public void configure(ClientOptionsBase options, boolean domainSocket, Bootstrap bootstrap) {
if (options.isTcpFastOpen()) { if (options.isTcpFastOpen()) {
bootstrap.option(EpollChannelOption.TCP_FASTOPEN_CONNECT, options.isTcpFastOpen()); bootstrap.option(EpollChannelOption.TCP_FASTOPEN_CONNECT, options.isTcpFastOpen());
} }
bootstrap.option(EpollChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); bootstrap.option(EpollChannelOption.TCP_QUICKACK, options.isTcpQuickAck());
bootstrap.option(EpollChannelOption.TCP_CORK, options.isTcpCork()); bootstrap.option(EpollChannelOption.TCP_CORK, options.isTcpCork());
super.configure(options, bootstrap); super.configure(options, domainSocket, bootstrap);
} }
} }
Expand Up @@ -76,27 +76,29 @@ public DatagramChannel datagramChannel(InternetProtocolFamily family) {
} }


@Override @Override
public ChannelFactory<? extends Channel> channelFactory(boolean domain) { public ChannelFactory<? extends Channel> channelFactory(boolean domainSocket) {
if (domain) { if (domainSocket) {
return KQueueDomainSocketChannel::new; return KQueueDomainSocketChannel::new;
} else { } else {
return KQueueSocketChannel::new; return KQueueSocketChannel::new;
} }
} }


@Override @Override
public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domain) { public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domainSocket) {
if (domain) { if (domainSocket) {
return KQueueServerDomainSocketChannel::new; return KQueueServerDomainSocketChannel::new;
} else { } else {
return KQueueServerSocketChannel::new; return KQueueServerSocketChannel::new;
} }
} }


@Override @Override
public void configure(NetServerOptions options, ServerBootstrap bootstrap) { public void configure(NetServerOptions options, boolean domainSocket, ServerBootstrap bootstrap) {
bootstrap.option(KQueueChannelOption.SO_REUSEPORT, options.isReusePort()); if (!domainSocket) {
super.configure(options, bootstrap); bootstrap.option(KQueueChannelOption.SO_REUSEPORT, options.isReusePort());
}
super.configure(options, domainSocket, bootstrap);
} }


@Override @Override
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/io/vertx/core/net/impl/transport/Transport.java
Expand Up @@ -157,21 +157,21 @@ public DatagramChannel datagramChannel(InternetProtocolFamily family) {


/** /**
* @return the type for channel * @return the type for channel
* @param domain whether to create a unix domain channel or a socket channel * @param domainSocket whether to create a unix domain channel or a socket channel
*/ */
public ChannelFactory<? extends Channel> channelFactory(boolean domain) { public ChannelFactory<? extends Channel> channelFactory(boolean domainSocket) {
if (domain) { if (domainSocket) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
return NioSocketChannel::new; return NioSocketChannel::new;
} }


/** /**
* @return the type for server channel * @return the type for server channel
* @param domain whether to create a server unix domain channel or a regular server socket channel * @param domainSocket whether to create a server unix domain channel or a regular server socket channel
*/ */
public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domain) { public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domainSocket) {
if (domain) { if (domainSocket) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
return NioServerSocketChannel::new; return NioServerSocketChannel::new;
Expand Down Expand Up @@ -206,11 +206,15 @@ public void configure(DatagramChannel channel, DatagramSocketOptions options) {
} }
} }


public void configure(ClientOptionsBase options, Bootstrap bootstrap) { public void configure(ClientOptionsBase options, boolean domainSocket, Bootstrap bootstrap) {
if (!domainSocket) {
bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress());
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
}
if (options.getLocalAddress() != null) { if (options.getLocalAddress() != null) {
bootstrap.localAddress(options.getLocalAddress(), 0); bootstrap.localAddress(options.getLocalAddress(), 0);
} }
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
if (options.getSendBufferSize() != -1) { if (options.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
} }
Expand All @@ -226,12 +230,14 @@ public void configure(ClientOptionsBase options, Bootstrap bootstrap) {
} }
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress());
} }


public void configure(NetServerOptions options, ServerBootstrap bootstrap) { public void configure(NetServerOptions options, boolean domainSocket, ServerBootstrap bootstrap) {
bootstrap.childOption(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress());
if (!domainSocket) {
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.childOption(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
}
if (options.getSendBufferSize() != -1) { if (options.getSendBufferSize() != -1) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); bootstrap.childOption(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
} }
Expand All @@ -246,8 +252,6 @@ public void configure(NetServerOptions options, ServerBootstrap bootstrap) {
bootstrap.childOption(ChannelOption.IP_TOS, options.getTrafficClass()); bootstrap.childOption(ChannelOption.IP_TOS, options.getTrafficClass());
} }
bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress());
if (options.getAcceptBacklog() != -1) { if (options.getAcceptBacklog() != -1) {
bootstrap.option(ChannelOption.SO_BACKLOG, options.getAcceptBacklog()); bootstrap.option(ChannelOption.SO_BACKLOG, options.getAcceptBacklog());
} }
Expand Down
Expand Up @@ -56,7 +56,7 @@ private void testConnectErrorNotifiesOnEventLoop(NetClientOptions options) {
RuntimeException cause = new RuntimeException(); RuntimeException cause = new RuntimeException();
vertx = VertxImpl.vertx(new VertxOptions(), new Transport() { vertx = VertxImpl.vertx(new VertxOptions(), new Transport() {
@Override @Override
public ChannelFactory<? extends Channel> channelFactory(boolean domain) { public ChannelFactory<? extends Channel> channelFactory(boolean domainSocket) {
return (ChannelFactory<Channel>) () -> { return (ChannelFactory<Channel>) () -> {
throw cause; throw cause;
}; };
Expand All @@ -79,7 +79,7 @@ public void testNetBindError() {
RuntimeException cause = new RuntimeException(); RuntimeException cause = new RuntimeException();
vertx = VertxImpl.vertx(new VertxOptions(), new Transport() { vertx = VertxImpl.vertx(new VertxOptions(), new Transport() {
@Override @Override
public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domain) { public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domainSocket) {
return (ChannelFactory<ServerChannel>) () -> { return (ChannelFactory<ServerChannel>) () -> {
throw cause; throw cause;
}; };
Expand All @@ -99,7 +99,7 @@ public void testHttpBindError() {
RuntimeException cause = new RuntimeException(); RuntimeException cause = new RuntimeException();
vertx = VertxImpl.vertx(new VertxOptions(), new Transport() { vertx = VertxImpl.vertx(new VertxOptions(), new Transport() {
@Override @Override
public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domain) { public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domainSocket) {
return (ChannelFactory<ServerChannel>) () -> { return (ChannelFactory<ServerChannel>) () -> {
throw cause; throw cause;
}; };
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/vertx/core/net/NetTest.java
Expand Up @@ -1720,6 +1720,8 @@ public void testSharedServersRoundRobin() throws Exception {


int numServers = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE / 2- 1; int numServers = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE / 2- 1;
int numConnections = numServers * 20; int numConnections = numServers * 20;
System.out.println("numServers = " + numServers);
System.out.println("numConnections = " + numConnections);


List<NetServer> servers = new ArrayList<>(); List<NetServer> servers = new ArrayList<>();
Set<NetServer> connectedServers = new ConcurrentHashSet<>(); Set<NetServer> connectedServers = new ConcurrentHashSet<>();
Expand Down

0 comments on commit ff304f0

Please sign in to comment.