Skip to content

Commit

Permalink
Close all shared TCP based servers when Vert.x is closing - fixes #2722
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 20, 2018
1 parent 25565f6 commit 4ab19ce
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 17 deletions.
3 changes: 3 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpHandlers.java
Expand Up @@ -25,16 +25,19 @@
*/ */
public class HttpHandlers { public class HttpHandlers {


final HttpServerImpl server;
final Handler<HttpServerRequest> requestHandler; final Handler<HttpServerRequest> requestHandler;
final Handler<ServerWebSocket> wsHandler; final Handler<ServerWebSocket> wsHandler;
final Handler<HttpConnection> connectionHandler; final Handler<HttpConnection> connectionHandler;
final Handler<Throwable> exceptionHandler; final Handler<Throwable> exceptionHandler;


public HttpHandlers( public HttpHandlers(
HttpServerImpl server,
Handler<HttpServerRequest> requestHandler, Handler<HttpServerRequest> requestHandler,
Handler<ServerWebSocket> wsHandler, Handler<ServerWebSocket> wsHandler,
Handler<HttpConnection> connectionHandler, Handler<HttpConnection> connectionHandler,
Handler<Throwable> exceptionHandler) { Handler<Throwable> exceptionHandler) {
this.server = server;
this.requestHandler = requestHandler; this.requestHandler = requestHandler;
this.wsHandler = wsHandler; this.wsHandler = wsHandler;
this.connectionHandler = connectionHandler; this.connectionHandler = connectionHandler;
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -31,10 +31,7 @@
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.vertx.core.AsyncResult; import io.vertx.core.*;
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*; import io.vertx.core.http.*;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
Expand Down Expand Up @@ -422,6 +419,7 @@ private void configureHttp1(ChannelPipeline pipeline, HandlerHolder<HttpHandlers
// some casting and a header check // some casting and a header check
} else { } else {
holder = new HandlerHolder<>(holder.context, new HttpHandlers( holder = new HandlerHolder<>(holder.context, new HttpHandlers(
this,
new WebSocketRequestHandler(metrics, holder.handler), new WebSocketRequestHandler(metrics, holder.handler),
holder.handler.wsHandler, holder.handler.wsHandler,
holder.handler.connectionHandler, holder.handler.connectionHandler,
Expand Down Expand Up @@ -504,6 +502,18 @@ void configureHttp2(ChannelPipeline pipeline) {
} }
} }


/**
* Internal method that closes all servers when Vert.x is closing
*/
public void closeAll(Handler<AsyncResult<Void>> handler) {
List<HttpHandlers> list = httpHandlerMgr.handlers();
List<Future> futures = list.stream()
.<Future<Void>>map(handlers -> Future.future(handlers.server::close))
.collect(Collectors.toList());
CompositeFuture fut = CompositeFuture.all(futures);
fut.setHandler(ar -> handler.handle(ar.mapEmpty()));
}

@Override @Override
public void close() { public void close() {
close(null); close(null);
Expand Down Expand Up @@ -545,6 +555,7 @@ public synchronized void close(Handler<AsyncResult<Void>> done) {


actualServer.httpHandlerMgr.removeHandler( actualServer.httpHandlerMgr.removeHandler(
new HttpHandlers( new HttpHandlers(
this,
requestStream.handler(), requestStream.handler(),
wsStream.handler(), wsStream.handler(),
connectionHandler, connectionHandler,
Expand All @@ -569,6 +580,10 @@ public synchronized void close(Handler<AsyncResult<Void>> done) {
} }
} }


public synchronized boolean isClosed() {
return !listening;
}

@Override @Override
public Metrics getMetrics() { public Metrics getMetrics() {
return metrics; return metrics;
Expand All @@ -591,6 +606,7 @@ private void applyConnectionOptions(boolean domainSocket, ServerBootstrap bootst
private void addHandlers(HttpServerImpl server, ContextInternal context) { private void addHandlers(HttpServerImpl server, ContextInternal context) {
server.httpHandlerMgr.addHandler( server.httpHandlerMgr.addHandler(
new HttpHandlers( new HttpHandlers(
this,
requestStream.handler(), requestStream.handler(),
wsStream.handler(), wsStream.handler(),
connectionHandler, connectionHandler,
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -554,7 +554,7 @@ public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
eventBus.close(ar4 -> { eventBus.close(ar4 -> {
closeClusterManager(ar5 -> { closeClusterManager(ar5 -> {
// Copy set to prevent ConcurrentModificationException // Copy set to prevent ConcurrentModificationException
Set<HttpServer> httpServers = new HashSet<>(sharedHttpServers.values()); Set<HttpServerImpl> httpServers = new HashSet<>(sharedHttpServers.values());
Set<NetServerImpl> netServers = new HashSet<>(sharedNetServers.values()); Set<NetServerImpl> netServers = new HashSet<>(sharedNetServers.values());
sharedHttpServers.clear(); sharedHttpServers.clear();
sharedNetServers.clear(); sharedNetServers.clear();
Expand All @@ -572,11 +572,11 @@ public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
} }
}; };


for (HttpServer server : httpServers) { for (HttpServerImpl server : httpServers) {
server.close(serverCloseHandler); server.closeAll(serverCloseHandler);
} }
for (NetServerImpl server : netServers) { for (NetServerImpl server : netServers) {
server.close(serverCloseHandler); server.closeAll(serverCloseHandler);
} }
if (serverCount == 0) { if (serverCount == 0) {
deleteCacheDirAndShutdown(completionHandler); deleteCacheDirAndShutdown(completionHandler);
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/vertx/core/net/impl/HandlerManager.java
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;


/** /**
* @author <a href="http://tfox.org">Tim Fox</a> * @author <a href="http://tfox.org">Tim Fox</a>
Expand All @@ -44,6 +45,13 @@ public boolean hasHandlers() {
return hasHandlers; return hasHandlers;
} }


public synchronized List<T> handlers() {
return handlerMap.values().stream()
.flatMap(handlers -> handlers.list.stream())
.map(holder -> holder.handler)
.collect(Collectors.toList());
}

public HandlerHolder<T> chooseHandler(EventLoop worker) { public HandlerHolder<T> chooseHandler(EventLoop worker) {
Handlers<T> handlers = handlerMap.get(worker); Handlers<T> handlers = handlerMap.get(worker);
return handlers == null ? null : handlers.chooseHandler(); return handlers == null ? null : handlers.chooseHandler();
Expand Down
34 changes: 26 additions & 8 deletions src/main/java/io/vertx/core/net/impl/NetServerImpl.java
Expand Up @@ -22,10 +22,7 @@
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.vertx.core.AsyncResult; import io.vertx.core.*;
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
Expand All @@ -41,9 +38,12 @@
import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.ReadStream;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;


/** /**
* *
Expand Down Expand Up @@ -221,7 +221,7 @@ protected void initChannel(Channel ch) throws Exception {


applyConnectionOptions(socketAddress.path() != null, bootstrap); applyConnectionOptions(socketAddress.path() != null, bootstrap);


handlerManager.addHandler(new Handlers(handler, exceptionHandler), listenContext); handlerManager.addHandler(new Handlers(this, handler, exceptionHandler), listenContext);


try { try {
bindFuture = AsyncResolveConnectHelper.doBind(vertx, socketAddress, bootstrap); bindFuture = AsyncResolveConnectHelper.doBind(vertx, socketAddress, bootstrap);
Expand Down Expand Up @@ -266,7 +266,7 @@ protected void initChannel(Channel ch) throws Exception {
this.actualPort = shared.actualPort(); this.actualPort = shared.actualPort();
VertxMetrics metrics = vertx.metricsSPI(); VertxMetrics metrics = vertx.metricsSPI();
this.metrics = metrics != null ? metrics.createNetServerMetrics(options, new SocketAddressImpl(id.port, id.host)) : null; this.metrics = metrics != null ? metrics.createNetServerMetrics(options, new SocketAddressImpl(id.port, id.host)) : null;
actualServer.handlerManager.addHandler(new Handlers(handler, exceptionHandler), listenContext); actualServer.handlerManager.addHandler(new Handlers(this, handler, exceptionHandler), listenContext);
} }


// just add it to the future so it gets notified once the bind is complete // just add it to the future so it gets notified once the bind is complete
Expand Down Expand Up @@ -348,6 +348,18 @@ public ReadStream<NetSocket> connectStream() {
return connectStream; return connectStream;
} }


/**
* Internal method that closes all servers when Vert.x is closing
*/
public void closeAll(Handler<AsyncResult<Void>> handler) {
List<Handlers> list = handlerManager.handlers();
List<Future> futures = list.stream()
.<Future<Void>>map(handlers -> Future.future(handlers.server::close))
.collect(Collectors.toList());
CompositeFuture fut = CompositeFuture.all(futures);
fut.setHandler(ar -> handler.handle(ar.mapEmpty()));
}

@Override @Override
public synchronized void close(Handler<AsyncResult<Void>> completionHandler) { public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
if (creatingContext != null) { if (creatingContext != null) {
Expand Down Expand Up @@ -379,7 +391,7 @@ public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
synchronized (vertx.sharedNetServers()) { synchronized (vertx.sharedNetServers()) {


if (actualServer != null) { if (actualServer != null) {
actualServer.handlerManager.removeHandler(new Handlers(registeredHandler, exceptionHandler), listenContext); actualServer.handlerManager.removeHandler(new Handlers(this, registeredHandler, exceptionHandler), listenContext);


if (actualServer.handlerManager.hasHandlers()) { if (actualServer.handlerManager.hasHandlers()) {
// The actual server still has handlers so we don't actually close it // The actual server still has handlers so we don't actually close it
Expand All @@ -400,6 +412,10 @@ public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
} }
} }


public synchronized boolean isClosed() {
return !listening;
}

public synchronized int actualPort() { public synchronized int actualPort() {
return actualPort; return actualPort;
} }
Expand Down Expand Up @@ -532,9 +548,11 @@ public NetSocketStream exceptionHandler(Handler<Throwable> handler) {
} }


static class Handlers { static class Handlers {
final NetServer server;
final Handler<NetSocket> connectionHandler; final Handler<NetSocket> connectionHandler;
final Handler<Throwable> exceptionHandler; final Handler<Throwable> exceptionHandler;
public Handlers(Handler<NetSocket> connectionHandler, Handler<Throwable> exceptionHandler) { public Handlers(NetServer server, Handler<NetSocket> connectionHandler, Handler<Throwable> exceptionHandler) {
this.server = server;
this.connectionHandler = connectionHandler; this.connectionHandler = connectionHandler;
this.exceptionHandler = exceptionHandler; this.exceptionHandler = exceptionHandler;
} }
Expand Down
Expand Up @@ -233,7 +233,7 @@ protected void encodeHeaders(HttpHeaders headers, ByteBuf buf) {
.add(HEADER_CONTENT_LENGTH, HELLO_WORLD_LENGTH); .add(HEADER_CONTENT_LENGTH, HELLO_WORLD_LENGTH);
response.end(HELLO_WORLD_BUFFER); response.end(HELLO_WORLD_BUFFER);
}; };
HandlerHolder<HttpHandlers> holder = new HandlerHolder<>(context, new HttpHandlers(app, null, null, null)); HandlerHolder<HttpHandlers> holder = new HandlerHolder<>(context, new HttpHandlers(null, app, null, null, null));
VertxHandler<Http1xServerConnection> handler = VertxHandler.create(holder.context, chctx -> new Http1xServerConnection( VertxHandler<Http1xServerConnection> handler = VertxHandler.create(holder.context, chctx -> new Http1xServerConnection(
holder.context.owner(), holder.context.owner(),
null, null,
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/io/vertx/core/http/Http1xTest.java
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.*; import io.vertx.core.*;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.HttpClientRequestImpl; import io.vertx.core.http.impl.HttpClientRequestImpl;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.http.impl.HttpUtils; import io.vertx.core.http.impl.HttpUtils;
import io.vertx.core.impl.ConcurrentHashSet; import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.ContextInternal;
Expand Down Expand Up @@ -4639,4 +4640,26 @@ public void testChunkedClientRequest() {
})); }));
await(); await();
} }

@Test
public void testClosingVertxCloseSharedServers() throws Exception {
int numServers = 2;
Vertx vertx = Vertx.vertx();
List<HttpServerImpl> servers = new ArrayList<>();
for (int i = 0;i < numServers;i++) {
HttpServer server = vertx.createHttpServer(createBaseServerOptions()).requestHandler(req -> {

});
startServer(server);
servers.add((HttpServerImpl) server);
}
CountDownLatch latch = new CountDownLatch(1);
vertx.close(onSuccess(v -> {
latch.countDown();
}));
awaitLatch(latch);
servers.forEach(server -> {
assertTrue(server.isClosed());
});
}
} }
23 changes: 23 additions & 0 deletions src/test/java/io/vertx/core/net/NetTest.java
Expand Up @@ -30,6 +30,7 @@
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.impl.NetServerImpl;
import io.vertx.core.net.impl.SocketAddressImpl; import io.vertx.core.net.impl.SocketAddressImpl;
import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.*; import io.vertx.test.core.*;
Expand Down Expand Up @@ -1817,6 +1818,28 @@ public void testSharedServersRoundRobinButFirstStartAndStopServer() throws Excep
testSharedServersRoundRobin(); testSharedServersRoundRobin();
} }


@Test
public void testClosingVertxCloseSharedServers() throws Exception {
int numServers = 2;
Vertx vertx = Vertx.vertx();
List<NetServerImpl> servers = new ArrayList<>();
for (int i = 0;i < numServers;i++) {
NetServer server = vertx.createNetServer().connectHandler(so -> {
fail();
});
startServer(server);
servers.add((NetServerImpl) server);
}
CountDownLatch latch = new CountDownLatch(1);
vertx.close(onSuccess(v -> {
latch.countDown();
}));
awaitLatch(latch);
servers.forEach(server -> {
assertTrue(server.isClosed());
});
}

@Test @Test
// This tests using NetSocket.writeHandlerID (on the server side) // This tests using NetSocket.writeHandlerID (on the server side)
// Send some data and make sure it is fanned out to all connections // Send some data and make sure it is fanned out to all connections
Expand Down

0 comments on commit 4ab19ce

Please sign in to comment.