Skip to content
Permalink
Browse files
[NO ISSUE][OTH] Support multiple addresses in http servers
- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Allow binding http servers to multiple addresses.
- Add test cases.

Change-Id: I68f25dc5af471c7ded29f27405c311947a007947
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9624
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
  • Loading branch information
mhubail committed Jan 18, 2021
1 parent 1ff8628 commit 572171e51c6c52b26d4e7b58a1d0940a232d09d6
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 38 deletions.
@@ -20,6 +20,9 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +42,7 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@@ -50,6 +54,8 @@
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

public class HttpServer {
// Constants
@@ -64,6 +70,7 @@ public class HttpServer {
private static final int STARTING = 1;
private static final int STARTED = 2;
private static final int STOPPING = 3;
private static final int RECOVERING = 4;
// Final members
private final IChannelClosedHandler closedHandler;
private final Object lock = new Object();
@@ -73,38 +80,59 @@ public class HttpServer {
private final ServletRegistry servlets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final InetSocketAddress address;
private final InetSocketAddress defaultAddress;
private final List<InetSocketAddress> addresses;
private final ThreadPoolExecutor executor;
// Mutable members
private volatile int state = STOPPED;
private volatile Thread recoveryThread;
private volatile Channel channel;
private final List<Channel> channels;
private Throwable cause;
private HttpServerConfig config;

private final GenericFutureListener<Future<Void>> channelCloseListener = f -> {
// This listener is invoked from within a netty IO thread. Hence, we can never block it
// For simplicity, we will submit the recovery task to a different thread. We will also
// close all channels on this server and attempt to rebind them.
synchronized (lock) {
if (state != STARTED) {
return;
}
LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
MXHelper.logFileDescriptors();
state = RECOVERING;
triggerRecovery();
}
};

public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, HttpServerConfig config) {
this(bossGroup, workerGroup, new InetSocketAddress(port), config, null);
this(bossGroup, workerGroup, Collections.singletonList(new InetSocketAddress(port)), config, null);
}

public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
HttpServerConfig config) {
this(bossGroup, workerGroup, address, config, null);
HttpServerConfig config, IChannelClosedHandler closeHandler) {
this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler);
}

public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, List<InetSocketAddress> addresses,
HttpServerConfig config, IChannelClosedHandler closeHandler) {
if (addresses.isEmpty()) {
throw new IllegalArgumentException("no addresses specified");
}
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.address = address;
this.addresses = addresses;
defaultAddress = addresses.get(0);
this.closedHandler = closeHandler;
this.config = config;
channels = new ArrayList<>();
ctx = new ConcurrentHashMap<>();
servlets = new ServletRegistry();
workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
int numExecutorThreads = config.getThreadCount();
executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
runnable -> new Thread(runnable,
"HttpExecutor(port:" + address.getPort() + ")-" + threadId.getAndIncrement()));
"HttpExecutor(port:" + defaultAddress.getPort() + ")-" + threadId.getAndIncrement()));
long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
+ numExecutorThreads * config.getMaxResponseChunkSize();
LOGGER.log(Level.DEBUG,
@@ -128,7 +156,7 @@ public final void start() throws Exception { // NOSONAR
doStart();
setStarted();
} catch (Throwable e) { // NOSONAR
LOGGER.error("Failure starting an Http Server at: {}", address, e);
LOGGER.error("Failure starting an Http Server at: {}", defaultAddress, e);
setFailed(e);
throw e;
}
@@ -175,6 +203,8 @@ public String getState() {
return "STOPPING";
case STOPPED:
return "STOPPED";
case RECOVERING:
return "RECOVERING";
default:
return "UNKNOWN";
}
@@ -229,34 +259,31 @@ protected void doStart() throws InterruptedException, IOException {
for (IServlet servlet : servlets.getServlets()) {
servlet.init();
}
channel = bind();
bind();
}

private Channel bind() throws InterruptedException {
private void bind() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
.childOption(ChannelOption.AUTO_READ, Boolean.FALSE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
Channel newChannel = b.bind(address).sync().channel();
newChannel.closeFuture().addListener(f -> {
// This listener is invoked from within a netty IO thread. Hence, we can never block it
// For simplicity, we will submit the recovery task to a different thread
List<ChannelFuture> channelFutures = new ArrayList<>();
for (InetSocketAddress address : addresses) {
channelFutures.add(b.bind(address));
}
for (ChannelFuture future : channelFutures) {
Channel channel = future.sync().channel();
channel.closeFuture().addListener(channelCloseListener);
synchronized (lock) {
if (state != STARTED) {
return;
}
LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
MXHelper.logFileDescriptors();
triggerRecovery();
channels.add(channel);
}
});
return newChannel;
}
}

private void triggerRecovery() {
private void triggerRecovery() throws InterruptedException {
Thread rt = recoveryThread;
if (rt != null) {
try {
@@ -267,17 +294,19 @@ private void triggerRecovery() {
return;
}
}
// try to revive the channel
// try to revive the channels
recoveryThread = new Thread(this::recover);
recoveryThread.start();
}

public void recover() {
try {
synchronized (lock) {
while (state == STARTED) {
while (state == RECOVERING) {
try {
channel = bind();
closeChannels();
bind();
setStarted();
break;
} catch (InterruptedException e) {
LOGGER.log(Level.WARN, this + " was interrupted while attempting to revive server channel", e);
@@ -329,10 +358,7 @@ protected void doStop() throws InterruptedException {
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Error while shutting down http server executor", e);
}
if (channel != null) {
channel.close();
channel.closeFuture().sync();
}
closeChannels();
}

public IServlet getServlet(FullHttpRequest request) {
@@ -369,15 +395,26 @@ public HttpScheme getScheme() {

@Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + address + ",\"state\":\"" + getState()
+ "\"}";
return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + defaultAddress + ",\"state\":\""
+ getState() + "\"}";
}

public HttpServerConfig getConfig() {
return config;
}

public InetSocketAddress getAddress() {
return address;
return defaultAddress;
}

private void closeChannels() throws InterruptedException {
synchronized (lock) {
for (Channel channel : channels) {
channel.closeFuture().removeListener(channelCloseListener);
channel.close();
channel.closeFuture().sync();
}
channels.clear();
}
}
}
@@ -28,6 +28,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -263,7 +264,10 @@ public void testServerRevival() throws Exception {
WebManager webMgr = new WebManager();
final HttpServerConfig config = HttpServerConfigBuilder.custom().setThreadCount(numExecutors)
.setRequestQueueSize(serverQueueSize).build();
HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config);
List<InetSocketAddress> addresses = new ArrayList<>();
addresses.add(new InetSocketAddress(PORT));
addresses.add(new InetSocketAddress(PORT + 1));
HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
server.addServlet(servlet);
webMgr.add(server);
@@ -276,12 +280,12 @@ public void testServerRevival() throws Exception {
}
Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
// close the channel
Field channelField = server.getClass().getDeclaredField("channel");
Field channelField = server.getClass().getDeclaredField("channels");
channelField.setAccessible(true);
Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread");
recoveryThreadField.setAccessible(true);
Channel channel = (Channel) channelField.get(server);
channel.close();
List<Channel> channels = (ArrayList<Channel>) channelField.get(server);
channels.get(0).close();
Thread.sleep(1000);
final int sleeps = 10;
for (int i = 0; i < sleeps; i++) {
@@ -409,6 +413,43 @@ public void chunkedRequestTest() throws Exception {
}
}

@Test
public void multiAddressServerTest() throws Exception {
final WebManager webMgr = new WebManager();
final HttpServerConfig config =
HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
List<Integer> ports = Arrays.asList(PORT, PORT + 1);
List<InetSocketAddress> addresses = new ArrayList<>();
for (Integer port : ports) {
addresses.add(new InetSocketAddress(port));
}
HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
EchoServlet servlet = new EchoServlet(server.ctx(), PATH);
server.addServlet(servlet);
webMgr.add(server);
webMgr.start();
try {
for (Integer port : ports) {
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
final URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, port,
HttpServerTest.PATH, null, null);
final HttpPost postRequest = new HttpPost(uri);
final String requestBody = "test";
final StringEntity chunkedEntity = new StringEntity(requestBody);
chunkedEntity.setChunked(true);
postRequest.setEntity(chunkedEntity);
try (CloseableHttpResponse response = httpClient.execute(postRequest)) {
final String responseBody = EntityUtils.toString(response.getEntity());
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpResponseStatus.OK.code());
Assert.assertEquals(responseBody, requestBody);
}
}
}
} finally {
webMgr.stop();
}
}

private void request(int count) throws URISyntaxException {
request(count, 0);
}

0 comments on commit 572171e

Please sign in to comment.