Skip to content

Commit

Permalink
When a AsyncResolveBindConnectHelper is already succeeded the listene…
Browse files Browse the repository at this point in the history
…r callback should be on the channel event loop
  • Loading branch information
vietj committed May 8, 2016
1 parent 28ae4dd commit ce91592
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 28 deletions.
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -18,7 +18,6 @@


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
Expand Down Expand Up @@ -276,10 +275,10 @@ public void connect(VertxInternal vertx, Bootstrap bootstrap, HttpClientOptions
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
} }
}); });
AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap); AsyncResolveBindConnectHelper future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap);
future.addListener(res -> { future.addListener(res -> {
if (res.succeeded()) { if (res.succeeded()) {
channelHandler.handle(Future.succeededFuture(res.result().channel())); channelHandler.handle(Future.succeededFuture(res.result()));
} else { } else {
channelHandler.handle(Future.failedFuture(res.cause())); channelHandler.handle(Future.failedFuture(res.cause()));
} }
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -101,7 +101,7 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {


private ChannelGroup serverChannelGroup; private ChannelGroup serverChannelGroup;
private volatile boolean listening; private volatile boolean listening;
private AsyncResolveBindConnectHelper<ChannelFuture> bindFuture; private AsyncResolveBindConnectHelper bindFuture;
private ServerID id; private ServerID id;
private HttpServerImpl actualServer; private HttpServerImpl actualServer;
private volatile int actualPort; private volatile int actualPort;
Expand Down Expand Up @@ -255,7 +255,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr
if (res.failed()) { if (res.failed()) {
vertx.sharedHttpServers().remove(id); vertx.sharedHttpServers().remove(id);
} else { } else {
Channel serverChannel = res.result().channel(); Channel serverChannel = res.result();
HttpServerImpl.this.actualPort = ((InetSocketAddress)serverChannel.localAddress()).getPort(); HttpServerImpl.this.actualPort = ((InetSocketAddress)serverChannel.localAddress()).getPort();
serverChannelGroup.add(serverChannel); serverChannelGroup.add(serverChannel);
metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options); metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options);
Expand Down
Expand Up @@ -18,6 +18,7 @@


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.Future; import io.vertx.core.Future;
Expand All @@ -32,27 +33,30 @@
/** /**
* @author <a href="http://tfox.org">Tim Fox</a> * @author <a href="http://tfox.org">Tim Fox</a>
*/ */
public class AsyncResolveBindConnectHelper<T> implements Handler<AsyncResult<T>> { public class AsyncResolveBindConnectHelper {


private List<Handler<AsyncResult<T>>> handlers = new ArrayList<>(); private List<Handler<AsyncResult<Channel>>> handlers = new ArrayList<>();
private boolean complete; private ChannelFuture future;
private AsyncResult<T> result; private AsyncResult<Channel> result;


public synchronized void addListener(Handler<AsyncResult<T>> handler) { public synchronized void addListener(Handler<AsyncResult<Channel>> handler) {
if (complete) { if (result != null) {
handler.handle(result); if (future != null) {
future.addListener(v -> handler.handle(result));
} else {
handler.handle(result);
}
} else { } else {
handlers.add(handler); handlers.add(handler);
} }
} }


@Override private synchronized void handle(ChannelFuture cf, AsyncResult<Channel> res) {
public synchronized void handle(AsyncResult<T> res) { if (result == null) {
if (!complete) { for (Handler<AsyncResult<Channel>> handler: handlers) {
for (Handler<AsyncResult<T>> handler: handlers) {
handler.handle(res); handler.handle(res);
} }
complete = true; future = cf;
result = res; result = res;
} else { } else {
throw new IllegalStateException("Already complete!"); throw new IllegalStateException("Already complete!");
Expand All @@ -65,35 +69,35 @@ private static void checkPort(int port) {
} }
} }


public static AsyncResolveBindConnectHelper<ChannelFuture> doBind(VertxInternal vertx, int port, String host, public static AsyncResolveBindConnectHelper doBind(VertxInternal vertx, int port, String host,
ServerBootstrap bootstrap) { ServerBootstrap bootstrap) {
return doBindConnect(vertx, port, host, bootstrap::bind); return doBindConnect(vertx, port, host, bootstrap::bind);
} }


public static AsyncResolveBindConnectHelper<ChannelFuture> doConnect(VertxInternal vertx, int port, String host, public static AsyncResolveBindConnectHelper doConnect(VertxInternal vertx, int port, String host,
Bootstrap bootstrap) { Bootstrap bootstrap) {
return doBindConnect(vertx, port, host, bootstrap::connect); return doBindConnect(vertx, port, host, bootstrap::connect);
} }


private static AsyncResolveBindConnectHelper<ChannelFuture> doBindConnect(VertxInternal vertx, int port, String host, private static AsyncResolveBindConnectHelper doBindConnect(VertxInternal vertx, int port, String host,
Function<InetSocketAddress, Function<InetSocketAddress,
ChannelFuture> cfProducer) { ChannelFuture> cfProducer) {
checkPort(port); checkPort(port);
AsyncResolveBindConnectHelper<ChannelFuture> asyncResolveBindConnectHelper = new AsyncResolveBindConnectHelper<>(); AsyncResolveBindConnectHelper asyncResolveBindConnectHelper = new AsyncResolveBindConnectHelper();
vertx.resolveHostname(host, res -> { vertx.resolveHostname(host, res -> {
if (res.succeeded()) { if (res.succeeded()) {
// At this point the name is an IP address so there will be no resolve hit // At this point the name is an IP address so there will be no resolve hit
InetSocketAddress t = new InetSocketAddress(res.result(), port); InetSocketAddress t = new InetSocketAddress(res.result(), port);
ChannelFuture future = cfProducer.apply(t); ChannelFuture future = cfProducer.apply(t);
future.addListener(f -> { future.addListener(f -> {
if (f.isSuccess()) { if (f.isSuccess()) {
asyncResolveBindConnectHelper.handle(Future.succeededFuture(future)); asyncResolveBindConnectHelper.handle(future, Future.succeededFuture(future.channel()));
} else { } else {
asyncResolveBindConnectHelper.handle(Future.failedFuture(f.cause())); asyncResolveBindConnectHelper.handle(future, Future.failedFuture(f.cause()));
} }
}); });
} else { } else {
asyncResolveBindConnectHelper.handle(Future.failedFuture(res.cause())); asyncResolveBindConnectHelper.handle(null, Future.failedFuture(res.cause()));
} }
}); });
return asyncResolveBindConnectHelper; return asyncResolveBindConnectHelper;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -171,12 +171,12 @@ protected void initChannel(Channel ch) throws Exception {
}); });


applyConnectionOptions(bootstrap); applyConnectionOptions(bootstrap);
AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap); AsyncResolveBindConnectHelper future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap);
future.addListener(res -> { future.addListener(res -> {


if (res.succeeded()) { if (res.succeeded()) {


Channel ch = res.result().channel(); Channel ch = res.result();


if (sslHelper.isSSL()) { if (sslHelper.isSSL()) {
// TCP connected, so now we must do the SSL handshake // TCP connected, so now we must do the SSL handshake
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/net/impl/NetServerImpl.java
Expand Up @@ -72,7 +72,7 @@ public class NetServerImpl implements NetServer, Closeable, MetricsProvider {
private volatile boolean listening; private volatile boolean listening;
private volatile ServerID id; private volatile ServerID id;
private NetServerImpl actualServer; private NetServerImpl actualServer;
private AsyncResolveBindConnectHelper<ChannelFuture> bindFuture; private AsyncResolveBindConnectHelper bindFuture;
private volatile int actualPort; private volatile int actualPort;
private ContextImpl listenContext; private ContextImpl listenContext;
private TCPMetrics metrics; private TCPMetrics metrics;
Expand Down Expand Up @@ -189,7 +189,7 @@ protected void initChannel(Channel ch) throws Exception {
bindFuture = AsyncResolveBindConnectHelper.doBind(vertx, port, host, bootstrap); bindFuture = AsyncResolveBindConnectHelper.doBind(vertx, port, host, bootstrap);
bindFuture.addListener(res -> { bindFuture.addListener(res -> {
if (res.succeeded()) { if (res.succeeded()) {
Channel ch = res.result().channel(); Channel ch = res.result();
log.trace("Net server listening on " + host + ":" + ch.localAddress()); log.trace("Net server listening on " + host + ":" + ch.localAddress());
// Update port to actual port - wildcard port 0 might have been used // Update port to actual port - wildcard port 0 might have been used
NetServerImpl.this.actualPort = ((InetSocketAddress)ch.localAddress()).getPort(); NetServerImpl.this.actualPort = ((InetSocketAddress)ch.localAddress()).getPort();
Expand Down

0 comments on commit ce91592

Please sign in to comment.