From 42c88e492069e7c1a17a19cdb67c9535f97bf8a1 Mon Sep 17 00:00:00 2001 From: Michael Wei Date: Mon, 11 Dec 2017 20:20:53 -0800 Subject: [PATCH 1/4] Refactor CorfuServer so that code can be reused in tests --- .../corfudb/infrastructure/BaseServer.java | 12 +- .../corfudb/infrastructure/CorfuServer.java | 379 ++++++++++-------- .../corfudb/infrastructure/LayoutServer.java | 11 +- .../infrastructure/NettyServerRouter.java | 53 +-- .../corfudb/infrastructure/ServerContext.java | 20 +- .../infrastructure/BaseServerTest.java | 2 +- .../infrastructure/ManagementServerTest.java | 2 +- .../infrastructure/ServerContextBuilder.java | 26 +- .../runtime/clients/BaseClientTest.java | 3 +- .../runtime/clients/ManagementClientTest.java | 2 +- .../clients/NettyClientRouterTest.java | 3 +- .../runtime/clients/NettyCommTest.java | 316 ++++++--------- .../runtime/clients/TestClientRouterTest.java | 7 +- .../runtime/view/AbstractViewTest.java | 12 +- 14 files changed, 416 insertions(+), 432 deletions(-) diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java b/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java index fd389ddfdea..8d8aba2630f 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; +import javax.annotation.Nonnull; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -22,10 +23,11 @@ @Slf4j public class BaseServer extends AbstractServer { - /** Options map, if available. */ - @Getter - @Setter - public Map optionsMap = new HashMap<>(); + final ServerContext serverContext; + + public BaseServer(@Nonnull ServerContext context) { + this.serverContext = context; + } /** Handler for the base server. */ @Getter @@ -55,7 +57,7 @@ private static void ping(CorfuMsg msg, ChannelHandlerContext ctx, @ServerHandler(type = CorfuMsgType.VERSION_REQUEST, opTimer = metricsPrefix + "version-request") private void getVersion(CorfuMsg msg, ChannelHandlerContext ctx, IServerRouter r, boolean isMetricsEnabled) { - VersionInfo vi = new VersionInfo(optionsMap); + VersionInfo vi = new VersionInfo(serverContext.getServerConfig()); r.sendResponse(ctx, msg, new JSONPayloadMsg<>(vi, CorfuMsgType.VERSION_RESPONSE)); } diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java b/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java index 80fde6a6aed..432f87cbc78 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java @@ -1,26 +1,31 @@ package org.corfudb.infrastructure; +import static org.fusesource.jansi.Ansi.Color.BLUE; +import static org.fusesource.jansi.Ansi.Color.MAGENTA; +import static org.fusesource.jansi.Ansi.Color.RED; +import static org.fusesource.jansi.Ansi.Color.WHITE; +import static org.fusesource.jansi.Ansi.ansi; + import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; - import com.google.common.collect.ImmutableList; - +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; - import java.io.File; import java.util.List; import java.util.Map; @@ -29,32 +34,26 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.regex.Pattern; import java.util.stream.Collectors; - +import javax.annotation.Nonnull; import javax.net.ssl.SSLEngine; - import javax.net.ssl.SSLException; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - import org.corfudb.protocols.wireprotocol.NettyCorfuMessageDecoder; import org.corfudb.protocols.wireprotocol.NettyCorfuMessageEncoder; +import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuError; +import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuInterruptedError; import org.corfudb.security.sasl.plaintext.PlainTextSaslNettyServer; import org.corfudb.security.tls.SslContextConstructor; -import org.corfudb.security.tls.TlsUtils; import org.corfudb.util.GitRepositoryState; import org.corfudb.util.Version; import org.docopt.Docopt; import org.fusesource.jansi.AnsiConsole; import org.slf4j.LoggerFactory; -import static org.fusesource.jansi.Ansi.Color.BLUE; -import static org.fusesource.jansi.Ansi.Color.MAGENTA; -import static org.fusesource.jansi.Ansi.Color.RED; -import static org.fusesource.jansi.Ansi.Color.WHITE; -import static org.fusesource.jansi.Ansi.ansi; - /** * This is the new Corfu server single-process executable. * @@ -165,20 +164,8 @@ public class CorfuServer { + " Show this screen\n" + " --version " + " Show version\n"; - public static boolean serverRunning = false; - @Getter - private static SequencerServer sequencerServer; - @Getter - private static LayoutServer layoutServer; - @Getter - private static LogUnitServer logUnitServer; - @Getter - private static ManagementServer managementServer; - private static NettyServerRouter router; + private static ServerContext serverContext; - private static SslContext sslContext; - private static String[] enabledTlsProtocols; - private static String[] enabledTlsCipherSuites; /** * Print the corfu logo. @@ -208,8 +195,6 @@ public static void printLogo() { * @param args command line argument strings */ public static void main(String[] args) { - serverRunning = true; - // Parse the options given, using docopt. Map opts = new Docopt(USAGE).withVersion(GitRepositoryState.getRepositoryState().describe) @@ -269,169 +254,231 @@ public static void main(String[] args) { } } - // Now, we start the Netty router, and have it route to the correct port. - router = new NettyServerRouter(opts); - // Create a common Server Context for all servers to access. - serverContext = new ServerContext(opts, router); - - // Add each role to the router. - addSequencer(); - addLayoutServer(); - addLogUnit(); - addManagementServer(); - router.baseServer.setOptionsMap(opts); - - // Setup SSL if needed - Boolean tlsEnabled = (Boolean) opts.get("--enable-tls"); - Boolean tlsMutualAuthEnabled = (Boolean) opts.get("--enable-tls-mutual-auth"); + serverContext = new ServerContext(opts); + + List servers = ImmutableList.builder() + .add(new BaseServer(serverContext)) + .add(new SequencerServer(serverContext)) + .add(new LayoutServer(serverContext)) + .add(new LogUnitServer(serverContext)) + .add(new ManagementServer(serverContext)) + .build(); + + NettyServerRouter router = new NettyServerRouter(servers); + + // Register shutdown handler + Thread shutdownThread = new Thread(() -> cleanShutdown(router)); + shutdownThread.setName("ShutdownThread"); + Runtime.getRuntime().addShutdownHook(shutdownThread); + + // Create the event loops responsible for servicing inbound messages. + final EventLoopGroup bossGroup = getBossGroup(serverContext); + final EventLoopGroup workerGroup = getWorkerGroup(serverContext); + + try { + startAndListen(bossGroup, + workerGroup, + NioServerSocketChannel.class, + b -> configureBootstrapOptions(serverContext, b), + serverContext, + router, + port).channel().closeFuture().syncUninterruptibly(); + + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + cleanShutdown(router); + } + } + + /** Start the Corfu server and bind it to the given {@code port} using the provided + * {@code channelType}. It is the callers' responsibility to shutdown the + * {@link EventLoopGroup}s. For implementations which listen on multiple ports, + * {@link EventLoopGroup}s may be reused. + * + * @param bossGroup The "boss" {@link EventLoopGroup} which services incoming + * connections. + * @param workerGroup The "worker" {@link EventLoopGroup} which services incoming + * requests. + * @param channelType The type of {@link ServerChannel} to use. + * @param bootstrapConsumer A consumer which will receive the {@link ServerBootstrap} to + * set options. + * @param context A {@link ServerContext} which will be used to configure the + * server. + * @param router A {@link NettyServerRouter} which will process incoming + * messages. + * @param port The port the {@link ServerChannel} will be created on. + * @param The type of the {@link ServerChannel}. + * @return A {@link ChannelFuture} which can be used to wait for the server + * to be shutdown. + */ + public static + ChannelFuture startAndListen(@Nonnull EventLoopGroup bossGroup, + @Nonnull EventLoopGroup workerGroup, + @Nonnull Class channelType, + @Nonnull Consumer bootstrapConsumer, + @Nonnull ServerContext context, + @Nonnull NettyServerRouter router, + int port) { + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(channelType); + bootstrapConsumer.accept(bootstrap); + + bootstrap.childHandler(getServerChannelInitializer(context, router)); + return bootstrap.bind(port).sync(); + } catch (InterruptedException ie) { + throw new UnrecoverableCorfuInterruptedError(ie); + } + } + + /** Configure server bootstrap per-channel options, such as TCP options, etc. + * + * @param context The {@link ServerContext} to use. + * @param bootstrap The {@link ServerBootstrap} to be configured. + */ + public static void configureBootstrapOptions(@Nonnull ServerContext context, + @Nonnull ServerBootstrap bootstrap) { + bootstrap.option(ChannelOption.SO_BACKLOG, 100) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + } + + /** Get a "boss" group, which services (accepts) incoming connections. + * + * @param context The {@link ServerContext} to use. + * @return A boss group. + */ + public static EventLoopGroup getBossGroup(@Nonnull ServerContext context) { + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("accept-%d") + .build(); + EventLoopGroup group = new NioEventLoopGroup(1, threadFactory); + log.info("getBossGroup: Type {}", group.getClass().getSimpleName()); + return group; + } + + /** Get a "worker" group, which services incoming requests. + * + * @param context The {@link ServerContext} to use. + * @return A worker group. + */ + public static @Nonnull EventLoopGroup getWorkerGroup(@Nonnull ServerContext context) { + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("worker-%d") + .build(); + + final int numThreads = Runtime.getRuntime().availableProcessors() * 2; + EventLoopGroup group = new NioEventLoopGroup(numThreads, threadFactory); + log.info("getWorkerGroup: Type {} with {} threads", + group.getClass().getSimpleName(), numThreads); + return group; + } + + + /** Obtain a {@link ChannelInitializer} which initializes the channel pipeline + * for a new {@link ServerChannel}. + * + * @param context The {@link ServerContext} to use. + * @param router The {@link NettyServerRouter} to initialize the channel with. + * @return A {@link ChannelInitializer} to intialize the channel. + */ + private static ChannelInitializer getServerChannelInitializer(@Nonnull ServerContext context, + @Nonnull NettyServerRouter router) { + // Security variables + final SslContext sslContext; + final String[] enabledTlsProtocols; + final String[] enabledTlsCipherSuites; + + // Security Initialization + Boolean tlsEnabled = context.getServerConfig(Boolean.class, "--enable-tls"); + Boolean tlsMutualAuthEnabled = context.getServerConfig(Boolean.class, + "--enable-tls-mutual-auth"); if (tlsEnabled) { // Get the TLS cipher suites to enable - String ciphs = (String) opts.get("--tls-ciphers"); + String ciphs = context.getServerConfig(String.class, "--tls-ciphers"); if (ciphs != null) { List ciphers = Pattern.compile(",") .splitAsStream(ciphs) .map(String::trim) .collect(Collectors.toList()); enabledTlsCipherSuites = ciphers.toArray(new String[ciphers.size()]); + } else { + enabledTlsCipherSuites = new String[]{}; } // Get the TLS protocols to enable - String protos = (String) opts.get("--tls-protocols"); + String protos = context.getServerConfig(String.class, "--tls-protocols"); if (protos != null) { List protocols = Pattern.compile(",") .splitAsStream(protos) .map(String::trim) .collect(Collectors.toList()); enabledTlsProtocols = protocols.toArray(new String[protocols.size()]); + } else { + enabledTlsProtocols = new String[]{}; } + try { - sslContext = SslContextConstructor.constructSslContext(true, (String) opts.get("--keystore"), - (String) opts.get("--keystore-password-file"), - (String) opts.get("--truststore"), (String) opts.get("--truststore-password-file")); + sslContext = SslContextConstructor.constructSslContext(true, + context.getServerConfig(String.class, "--keystore"), + context.getServerConfig(String.class, "--keystore-password-file"), + context.getServerConfig(String.class, "--truststore"), + context.getServerConfig(String.class, + "--truststore-password-file")); } catch (SSLException e) { log.error("Could not build the SSL context", e); - System.exit(1); + throw new UnrecoverableCorfuError("Couldn't build the SSL context", e); } + } else { + enabledTlsCipherSuites = new String[]{}; + enabledTlsProtocols = new String[]{}; + sslContext = null; } - Boolean saslPlainTextAuth = (Boolean) opts.get("--enable-sasl-plain-text-auth"); - - // Create the event loops responsible for servicing inbound messages. - EventLoopGroup bossGroup; - EventLoopGroup workerGroup; - EventExecutorGroup ee; - - bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { - final AtomicInteger threadNum = new AtomicInteger(0); + Boolean saslPlainTextAuth = context.getServerConfig(Boolean.class, + "--enable-sasl-plain-text-auth"); + // Generate the initializer. + return new ChannelInitializer() { @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("accept-" + threadNum.getAndIncrement()); - return t; - } - }); - - workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new - ThreadFactory() { - final AtomicInteger threadNum = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("io-" + threadNum.getAndIncrement()); - return t; - } - }); - - ee = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors() * 2, new - ThreadFactory() { - - final AtomicInteger threadNum = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("event-" + threadNum.getAndIncrement()); - return t; + protected void initChannel(@Nonnull Channel ch) throws Exception { + if (tlsEnabled) { + SSLEngine engine = sslContext.newEngine(ch.alloc()); + engine.setEnabledCipherSuites(enabledTlsCipherSuites); + engine.setEnabledProtocols(enabledTlsProtocols); + if (tlsMutualAuthEnabled) { + engine.setNeedClientAuth(true); } - }); - - - // Register shutdown handler - Thread shutdownThread = new Thread(CorfuServer::cleanShutdown); - shutdownThread.setName("ShutdownThread"); - Runtime.getRuntime().addShutdownHook( - shutdownThread); - - try { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, 100) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childOption(ChannelOption.SO_REUSEADDR, true) - .childOption(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(io.netty.channel.socket.SocketChannel ch) throws - Exception { - if (tlsEnabled) { - SSLEngine engine = sslContext.newEngine(ch.alloc()); - engine.setEnabledCipherSuites(enabledTlsCipherSuites); - engine.setEnabledProtocols(enabledTlsProtocols); - if (tlsMutualAuthEnabled) { - engine.setNeedClientAuth(true); - } - ch.pipeline().addLast("ssl", new SslHandler(engine)); - } - ch.pipeline().addLast(new LengthFieldPrepender(4)); - ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer - .MAX_VALUE, 0, 4, 0, 4)); - if (saslPlainTextAuth) { - ch.pipeline().addLast("sasl/plain-text", new - PlainTextSaslNettyServer()); - } - ch.pipeline().addLast(ee, new NettyCorfuMessageDecoder()); - ch.pipeline().addLast(ee, new NettyCorfuMessageEncoder()); - ch.pipeline().addLast(ee, router); - } - }); - ChannelFuture f = b.bind(port).sync(); - while (true) { - try { - f.channel().closeFuture().sync(); - } catch (InterruptedException ie) { - System.out.println(ie.getStackTrace()); + ch.pipeline().addLast("ssl", new SslHandler(engine)); + } + ch.pipeline().addLast(new LengthFieldPrepender(4)); + ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer + .MAX_VALUE, 0, 4, + 0, 4)); + if (saslPlainTextAuth) { + ch.pipeline().addLast("sasl/plain-text", new + PlainTextSaslNettyServer()); } + ch.pipeline().addLast(new NettyCorfuMessageDecoder()); + ch.pipeline().addLast(new NettyCorfuMessageEncoder()); + ch.pipeline().addLast(router); } - - } catch (InterruptedException ie) { - System.out.println(ie.getStackTrace()); - } catch (Exception ex) { - log.error("Corfu server shut down unexpectedly due to exception", ex); - } finally { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - cleanShutdown(); - } - + }; } /** * Attempt to cleanly shutdown all the servers. */ - public static void cleanShutdown() { + public static void cleanShutdown(@Nonnull NettyServerRouter router) { log.info("CleanShutdown: Starting Cleanup."); // Create a list of servers - final List servers = - ImmutableList.of(sequencerServer, - managementServer, - layoutServer, - logUnitServer); + final List servers = router.getServers(); // A executor service to create the shutdown threads // plus name the threads correctly. @@ -460,24 +507,4 @@ public static void cleanShutdown() { CompletableFuture.allOf(shutdownFutures).join(); log.info("CleanShutdown: Shutdown Complete."); } - - public static void addSequencer() { - sequencerServer = new SequencerServer(serverContext); - router.addServer(sequencerServer); - } - - public static void addLayoutServer() { - layoutServer = new LayoutServer(serverContext); - router.addServer(layoutServer); - } - - public static void addLogUnit() { - logUnitServer = new LogUnitServer(serverContext); - router.addServer(logUnitServer); - } - - public static void addManagementServer() { - managementServer = new ManagementServer(serverContext); - router.addServer(managementServer); - } } diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/LayoutServer.java b/infrastructure/src/main/java/org/corfudb/infrastructure/LayoutServer.java index 09188882e65..7197c2de53e 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/LayoutServer.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/LayoutServer.java @@ -178,6 +178,7 @@ public synchronized void handleMessageLayoutBootstrap( msg.getPayload().getLayout(), msg); setCurrentLayout(msg.getPayload().getLayout()); serverContext.setServerEpoch(getCurrentLayout().getEpoch()); + r.setServerEpoch(getCurrentLayout().getEpoch()); //send a response that the bootstrap was successful. r.sendResponse(ctx, msg, new CorfuMsg(CorfuMsgType.ACK)); } else { @@ -207,7 +208,8 @@ public synchronized void handleMessageSetEpoch(@NonNull CorfuPayloadMsg ms if (msg.getPayload() >= serverEpoch) { log.info("handleMessageSetEpoch: Received SET_EPOCH, moving to new epoch {}", msg.getPayload()); - setServerEpoch(msg.getPayload()); + serverContext.setServerEpoch(msg.getPayload()); + r.setServerEpoch(msg.getPayload()); r.sendResponse(ctx, msg, new CorfuMsg(CorfuMsgType.ACK)); } else { log.debug("handleMessageSetEpoch: Rejected SET_EPOCH current={}, requested={}", @@ -364,7 +366,8 @@ public synchronized void handleMessageLayoutCommit( } setCurrentLayout(commitLayout); - setServerEpoch(msg.getPayload().getEpoch()); + serverContext.setServerEpoch(msg.getPayload().getEpoch()); + r.setServerEpoch(msg.getPayload().getEpoch()); r.sendResponse(ctx, msg, new CorfuMsg(CorfuMsgType.ACK)); } @@ -409,10 +412,6 @@ public void setLayoutInHistory(Layout layout) { .getEpoch()), layout); } - private void setServerEpoch(long serverEpoch) { - serverContext.setServerEpoch(serverEpoch); - } - private long getServerEpoch() { return serverContext.getServerEpoch(); } diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/NettyServerRouter.java b/infrastructure/src/main/java/org/corfudb/infrastructure/NettyServerRouter.java index b1771e20e3f..6705e0128b2 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/NettyServerRouter.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/NettyServerRouter.java @@ -4,6 +4,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.EnumMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -86,9 +88,7 @@ protected static void handleUncaughtException(Thread t, @Nonnull Throwable e) { /** * This map stores the mapping from message type to netty server handler. */ - Map handlerMap; - - BaseServer baseServer; + private final Map handlerMap; /** * The epoch of this router. This is managed by the base server implementation. @@ -97,41 +97,32 @@ protected static void handleUncaughtException(Thread t, @Nonnull Throwable e) { @Setter long serverEpoch; - /** - * Returns a new NettyServerRouter. - * @param opts map of options (FIXME: unused) - */ - public NettyServerRouter(Map opts) { - handlerMap = new ConcurrentHashMap<>(); - baseServer = new BaseServer(); - addServer(baseServer); - } + /** The {@link AbstractServer}s this {@link NettyServerRouter} routes messages for. */ + @Getter + final List servers; - /** - * Add a new netty server handler to the router. + /** Construct an new {@link NettyServerRouter}. * - * @param server The server to add. + * @param servers A list of {@link AbstractServer}s this router will route + * messages for. */ - public void addServer(AbstractServer server) { - // Iterate through all types of CorfuMsgType, registering the handler - server.getHandler().getHandledTypes() - .forEach(x -> { - handlerMap.put(x, server); - log.trace("Registered {} to handle messages of type {}", server, x); - }); + public NettyServerRouter(List servers) { + this.servers = servers; + handlerMap = new EnumMap<>(CorfuMsgType.class); + servers.forEach(server -> server.getHandler().getHandledTypes() + .forEach(x -> handlerMap.put(x, server))); } /** - * Remove a server from the router. - * @param server server to remove + * {@inheritDoc} + * + *

This operation is no longer supported. The router will only route messages for + * servers provided at construction time. */ - public void removeServer(AbstractServer server) { - // Iterate through all types of CorfuMsgType, un-registering the handler - server.getHandler().getHandledTypes() - .forEach(x -> { - handlerMap.remove(x, server); - log.trace("Un-Registered {} to handle messages of type {}", server, x); - }); + @Override + @Deprecated + public void addServer(AbstractServer server) { + throw new UnsupportedOperationException("No longer supported"); } /** diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/ServerContext.java b/infrastructure/src/main/java/org/corfudb/infrastructure/ServerContext.java index 2d2fec4d240..5a43fd8937a 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/ServerContext.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/ServerContext.java @@ -52,6 +52,7 @@ public class ServerContext { private final DataStore dataStore; @Getter + @Setter private IServerRouter serverRouter; @Getter @@ -68,12 +69,10 @@ public class ServerContext { /** * Returns a new ServerContext. * @param serverConfig map of configuration strings to objects - * @param serverRouter server router */ - public ServerContext(Map serverConfig, IServerRouter serverRouter) { + public ServerContext(Map serverConfig) { this.serverConfig = serverConfig; this.dataStore = new DataStore(serverConfig); - this.serverRouter = serverRouter; this.failureDetectorPolicy = new PeriodicPollPolicy(); this.failureHandlerPolicy = new ConservativeFailureHandlerPolicy(); @@ -88,6 +87,18 @@ public ServerContext(Map serverConfig, IServerRouter serverRoute } } + /** Get a field from the server configuration map. + * + * @param type The type of the field. + * @param optionName The name of the option to retrieve. + * @param The type of the field to return. + * @return The field with the give option name. + */ + @SuppressWarnings("unchecked") + public T getServerConfig(Class type, String optionName) { + return (T) getServerConfig().get(optionName); + } + /** * The epoch of this router. This is managed by the base server implementation. */ @@ -102,9 +113,6 @@ public long getServerEpoch() { */ public void setServerEpoch(long serverEpoch) { dataStore.put(Long.class, PREFIX_EPOCH, KEY_EPOCH, serverEpoch); - // Set the epoch in the router as well. - //TODO need to figure out if we can remove this redundancy - serverRouter.setServerEpoch(serverEpoch); } public long getTailSegment() { diff --git a/test/src/test/java/org/corfudb/infrastructure/BaseServerTest.java b/test/src/test/java/org/corfudb/infrastructure/BaseServerTest.java index cd6c0c27327..d21cd6a4355 100644 --- a/test/src/test/java/org/corfudb/infrastructure/BaseServerTest.java +++ b/test/src/test/java/org/corfudb/infrastructure/BaseServerTest.java @@ -15,7 +15,7 @@ public class BaseServerTest extends AbstractServerTest { @Override public AbstractServer getDefaultServer() { if (bs == null) { - bs = new BaseServer(); + bs = new BaseServer(ServerContextBuilder.defaultContext(0)); } return bs; } diff --git a/test/src/test/java/org/corfudb/infrastructure/ManagementServerTest.java b/test/src/test/java/org/corfudb/infrastructure/ManagementServerTest.java index 1ec1575078d..7d9a6d7509b 100644 --- a/test/src/test/java/org/corfudb/infrastructure/ManagementServerTest.java +++ b/test/src/test/java/org/corfudb/infrastructure/ManagementServerTest.java @@ -32,7 +32,7 @@ public ManagementServer getDefaultServer() { .build(); // Required for management server to fetch layout. router.addServer(new LayoutServer(serverContext)); - router.addServer(new BaseServer()); + router.addServer(new BaseServer(ServerContextBuilder.defaultContext(0))); // Required to fetch global tails while handling failures. router.addServer(new LogUnitServer(serverContext)); // Required for management server to bootstrap during initialization. diff --git a/test/src/test/java/org/corfudb/infrastructure/ServerContextBuilder.java b/test/src/test/java/org/corfudb/infrastructure/ServerContextBuilder.java index 0512a315791..576331b95ff 100644 --- a/test/src/test/java/org/corfudb/infrastructure/ServerContextBuilder.java +++ b/test/src/test/java/org/corfudb/infrastructure/ServerContextBuilder.java @@ -19,7 +19,17 @@ public class ServerContextBuilder { boolean memory = true; String logPath = null; boolean noVerify = false; + boolean tlsEnabled = false; + boolean tlsMutualAuthEnabled = false; + String tlsProtocols = ""; + String tlsCiphers = ""; + String keystore = ""; + String keystorePasswordFile = ""; + boolean saslPlainTextAuth = false; + String truststore = ""; + String truststorePasswordFile = ""; + String cacheSizeHeapRatio = "0.5"; String address = "test"; int port = 9000; @@ -49,12 +59,24 @@ public ServerContext build() { .put("--address", address) .put("--cache-heap-ratio", cacheSizeHeapRatio) .put("--enable-tls", tlsEnabled) + .put("--enable-tls-mutual-auth", tlsMutualAuthEnabled) + .put("--tls-protocols", tlsProtocols) + .put("--tls-ciphers", tlsCiphers) + .put("--keystore", keystore) + .put("--keystore-password-file", keystorePasswordFile) + .put("--truststore", truststore) + .put("--truststore-password-file", truststorePasswordFile) + .put("--enable-sasl-plain-text-auth", saslPlainTextAuth) .put("", port); - return new ServerContext(builder.build(), serverRouter); + ServerContext sc = new ServerContext(builder.build()); + sc.setServerRouter(serverRouter); + return sc; } public static ServerContext defaultContext(int port) { - return new ServerContextBuilder().setPort(port).build(); + ServerContext sc = new ServerContextBuilder().setPort(port).build(); + sc.setServerRouter(new TestServerRouter()); + return sc; } public static ServerContext emptyContext() { diff --git a/test/src/test/java/org/corfudb/runtime/clients/BaseClientTest.java b/test/src/test/java/org/corfudb/runtime/clients/BaseClientTest.java index a0b0acafacd..2c6677c0689 100644 --- a/test/src/test/java/org/corfudb/runtime/clients/BaseClientTest.java +++ b/test/src/test/java/org/corfudb/runtime/clients/BaseClientTest.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableSet; import org.corfudb.infrastructure.AbstractServer; import org.corfudb.infrastructure.BaseServer; +import org.corfudb.infrastructure.ServerContextBuilder; import org.corfudb.util.CFUtils; import org.junit.Test; @@ -18,7 +19,7 @@ public class BaseClientTest extends AbstractClientTest { @Override Set getServersForTest() { return new ImmutableSet.Builder() - .add(new BaseServer()) + .add(new BaseServer(ServerContextBuilder.defaultContext(0))) .build(); } diff --git a/test/src/test/java/org/corfudb/runtime/clients/ManagementClientTest.java b/test/src/test/java/org/corfudb/runtime/clients/ManagementClientTest.java index 69dedee6602..0b7b687e4f6 100644 --- a/test/src/test/java/org/corfudb/runtime/clients/ManagementClientTest.java +++ b/test/src/test/java/org/corfudb/runtime/clients/ManagementClientTest.java @@ -41,7 +41,7 @@ Set getServersForTest() { // Required for management server to be able to bootstrap the sequencer. .add(new SequencerServer(serverContext)) .add(new LogUnitServer(serverContext)) - .add(new BaseServer()) + .add(new BaseServer(ServerContextBuilder.defaultContext(0))) .build(); } diff --git a/test/src/test/java/org/corfudb/runtime/clients/NettyClientRouterTest.java b/test/src/test/java/org/corfudb/runtime/clients/NettyClientRouterTest.java index 2375dc9bb24..c1021dad5ab 100644 --- a/test/src/test/java/org/corfudb/runtime/clients/NettyClientRouterTest.java +++ b/test/src/test/java/org/corfudb/runtime/clients/NettyClientRouterTest.java @@ -1,5 +1,6 @@ package org.corfudb.runtime.clients; +import org.corfudb.infrastructure.ServerContextBuilder; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -13,7 +14,7 @@ public void doesNotUpdateEpochBackward() throws Exception{ runWithBaseServer( (port) -> { - return new NettyServerData(port); + return new NettyServerData(ServerContextBuilder.defaultContext(port)); }, (port) -> { return new NettyClientRouter("localhost", port); diff --git a/test/src/test/java/org/corfudb/runtime/clients/NettyCommTest.java b/test/src/test/java/org/corfudb/runtime/clients/NettyCommTest.java index 4cc81e2fdf9..491e7a46940 100644 --- a/test/src/test/java/org/corfudb/runtime/clients/NettyCommTest.java +++ b/test/src/test/java/org/corfudb/runtime/clients/NettyCommTest.java @@ -21,12 +21,18 @@ import java.io.File; import java.nio.file.Files; import java.nio.file.StandardCopyOption; +import java.util.Collections; +import javax.annotation.Nonnull; import javax.net.ssl.SSLException; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.maven.settings.Server; import org.corfudb.AbstractCorfuTest; import org.corfudb.infrastructure.BaseServer; +import org.corfudb.infrastructure.CorfuServer; import org.corfudb.infrastructure.NettyServerRouter; +import org.corfudb.infrastructure.ServerContext; +import org.corfudb.infrastructure.ServerContextBuilder; import org.corfudb.protocols.wireprotocol.NettyCorfuMessageDecoder; import org.corfudb.protocols.wireprotocol.NettyCorfuMessageEncoder; import org.corfudb.security.sasl.plaintext.PlainTextSaslNettyServer; @@ -67,7 +73,7 @@ private Integer findRandomOpenPort() throws IOException { public void nettyServerClientPingable() throws Exception { runWithBaseServer( (port) -> { - return new NettyServerData(port); + return new NettyServerData(ServerContextBuilder.defaultContext(port)); }, (port) -> { return new NettyClientRouter("localhost", port); @@ -82,7 +88,7 @@ public void nettyServerClientPingable() throws Exception { public void nettyServerClientPingableAfterFailure() throws Exception { runWithBaseServer( (port) -> { - return new NettyServerData(port); + return new NettyServerData(ServerContextBuilder.defaultContext(port)); }, (port) -> { return new NettyClientRouter("localhost", port); @@ -101,17 +107,18 @@ public void nettyServerClientPingableAfterFailure() throws Exception { public void nettyTlsNoMutualAuth() throws Exception { runWithBaseServer( (port) -> { - NettyServerData d = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - d.enableTls( - "src/test/resources/security/s1.jks", - "src/test/resources/security/storepass", - "src/test/resources/security/s1.jks", - "src/test/resources/security/storepass", - false, - ciphers, - protocols); + NettyServerData d = new NettyServerData( + new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/s1.jks") + .setKeystorePasswordFile("src/test/resources/security/storepass") + .setTruststore("src/test/resources/security/s1.jks") + .setTruststorePasswordFile("src/test/resources/security/storepass") + .setPort(port) + .build() + ); return d; }, (port) -> { @@ -133,17 +140,17 @@ public void nettyTlsNoMutualAuth() throws Exception { public void nettyTlsMutualAuth() throws Exception { runWithBaseServer( (port) -> { - NettyServerData d = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - d.enableTls( - "src/test/resources/security/s1.jks", - "src/test/resources/security/storepass", - "src/test/resources/security/trust1.jks", - "src/test/resources/security/storepass", - true, - ciphers, - protocols); + NettyServerData d = new NettyServerData(new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/s1.jks") + .setKeystorePasswordFile("src/test/resources/security/storepass") + .setTruststore("src/test/resources/security/trust1.jks") + .setTruststorePasswordFile("src/test/resources/security/storepass") + .setTlsMutualAuthEnabled(true) + .setPort(port) + .build()); return d; }, (port) -> { @@ -165,17 +172,17 @@ public void nettyTlsMutualAuth() throws Exception { public void nettyTlsUnknownServer() throws Exception { runWithBaseServer( (port) -> { - NettyServerData d = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - d.enableTls( - "src/test/resources/security/s3.jks", - "src/test/resources/security/storepass", - "src/test/resources/security/trust1.jks", - "src/test/resources/security/storepass", - true, - ciphers, - protocols); + NettyServerData d = new NettyServerData(new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/s3.jks") + .setKeystorePasswordFile("src/test/resources/security/storepass") + .setTruststore("src/test/resources/security/trust1.jks") + .setTruststorePasswordFile("src/test/resources/security/storepass") + .setSaslPlainTextAuth(false) + .setPort(port) + .build()); return d; }, (port) -> { @@ -197,17 +204,17 @@ public void nettyTlsUnknownServer() throws Exception { public void nettyTlsUnknownClient() throws Exception { runWithBaseServer( (port) -> { - NettyServerData d = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - d.enableTls( - "src/test/resources/security/s1.jks", - "src/test/resources/security/storepass", - "src/test/resources/security/trust2.jks", - "src/test/resources/security/storepass", - true, - ciphers, - protocols); + NettyServerData d = new NettyServerData(new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/s1.jks") + .setKeystorePasswordFile("src/test/resources/security/storepass") + .setTruststore("src/test/resources/security/trust2.jks") + .setTruststorePasswordFile("src/test/resources/security/storepass") + .setTlsMutualAuthEnabled(true) + .setPort(port) + .build()); return d; }, (port) -> { @@ -229,17 +236,16 @@ public void nettyTlsUnknownClient() throws Exception { public void nettyTlsUnknownClientNoMutualAuth() throws Exception { runWithBaseServer( (port) -> { - NettyServerData d = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - d.enableTls( - "src/test/resources/security/s1.jks", - "src/test/resources/security/storepass", - "src/test/resources/security/trust2.jks", - "src/test/resources/security/storepass", - false, - ciphers, - protocols); + NettyServerData d = new NettyServerData(new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/s1.jks") + .setKeystorePasswordFile("src/test/resources/security/storepass") + .setTruststore("src/test/resources/security/trust2.jks") + .setTruststorePasswordFile("src/test/resources/security/storepass") + .setPort(port) + .build()); return d; }, (port) -> { @@ -263,18 +269,17 @@ public void nettySasl() throws Exception { (port) -> { System.setProperty("java.security.auth.login.config", "src/test/resources/security/corfudb_jaas.config"); - NettyServerData d = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - d.enableTls( - "src/test/resources/security/s1.jks", - "src/test/resources/security/storepass", - "src/test/resources/security/trust1.jks", - "src/test/resources/security/storepass", - true, - ciphers, - protocols); - d.enableSaslPlainTextAuth(); + NettyServerData d = new NettyServerData(new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/s1.jks") + .setKeystorePasswordFile("src/test/resources/security/storepass") + .setTruststore("src/test/resources/security/trust1.jks") + .setTruststorePasswordFile("src/test/resources/security/storepass") + .setSaslPlainTextAuth(true) + .setPort(port) + .build()); return d; }, (port) -> { @@ -300,18 +305,17 @@ public void nettySaslWrongPassword() throws Exception { (port) -> { System.setProperty("java.security.auth.login.config", "src/test/resources/security/corfudb_jaas.config"); - NettyServerData d = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - d.enableTls( - "src/test/resources/security/s1.jks", - "src/test/resources/security/storepass", - "src/test/resources/security/trust1.jks", - "src/test/resources/security/storepass", - true, - ciphers, - protocols); - d.enableSaslPlainTextAuth(); + NettyServerData d = new NettyServerData(new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/s1.jks") + .setKeystorePasswordFile("src/test/resources/security/storepass") + .setTruststore("src/test/resources/security/trust1.jks") + .setTruststorePasswordFile("src/test/resources/security/storepass") + .setSaslPlainTextAuth(true) + .setPort(port) + .build()); return d; }, (port) -> { @@ -348,8 +352,9 @@ public void testTlsUpdateClientTrust() throws Exception { * @throws Exception */ private void reloadedTrustManagerTestHelper(boolean replaceClientTrust) throws Exception { - NettyServerRouter serverRouter = new NettyServerRouter(new ImmutableMap.Builder().build()); - serverRouter.addServer(new BaseServer()); + NettyServerRouter serverRouter = + new NettyServerRouter(Collections.singletonList( + new BaseServer(ServerContextBuilder.emptyContext()))); int port = findRandomOpenPort(); File clientTrustNoServer = new File("src/test/resources/security/reload/client_trust_no_server.jks"); @@ -368,17 +373,19 @@ private void reloadedTrustManagerTestHelper(boolean replaceClientTrust) throws E } - NettyServerData serverData = new NettyServerData(port); - String[] ciphers = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"}; - String[] protocols = {"TLSv1.2"}; - serverData.enableTls( - "src/test/resources/security/reload/server_key.jks", - "src/test/resources/security/reload/password", - serverTrustFile.getAbsolutePath(), - "src/test/resources/security/reload/password", - true, - ciphers, - protocols); + NettyServerData serverData = new NettyServerData( + new ServerContextBuilder() + .setTlsEnabled(true) + .setTlsCiphers("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + .setTlsProtocols("TLSv1.2") + .setKeystore("src/test/resources/security/reload/server_key.jks") + .setKeystorePasswordFile("src/test/resources/security/reload/password") + .setTruststore(serverTrustFile.getAbsolutePath()) + .setTruststorePasswordFile("src/test/resources/security/reload/password") + .setTlsMutualAuthEnabled(true) + .setPort(port) + .build() + ); serverData.bootstrapServer(); @@ -416,8 +423,6 @@ void runWithBaseServer(NettyServerDataConstructor nsdc, NettyClientRouterConstructor ncrc, NettyCommFunction actionFn) throws Exception { - NettyServerRouter nsr = new NettyServerRouter(new ImmutableMap.Builder().build()); - nsr.addServer(new BaseServer()); int port = findRandomOpenPort(); NettyServerData d = nsdc.createNettyServerData(port); @@ -429,6 +434,8 @@ void runWithBaseServer(NettyServerDataConstructor nsdc, ncr.start(); actionFn.runTest(ncr, d); } catch (Exception ex) { + System.out.println("oops"); + ex.printStackTrace(); log.error("Exception ", ex); throw ex; } finally { @@ -460,112 +467,31 @@ public interface NettyCommFunction { @Data public class NettyServerData { ServerBootstrap b; - ChannelFuture f; - int port; + volatile ChannelFuture f; EventLoopGroup bossGroup; EventLoopGroup workerGroup; - EventExecutorGroup ee; - boolean tlsEnabled = false; - SslContext sslContext; - boolean tlsMutualAuthEnabled = false; - String[] enabledTlsCipherSuites; - String[] enabledTlsProtocols; - - boolean saslPlainTextAuthEnabled = false; - - public NettyServerData(int port) { - this.port = port; - } - - public void enableTls(String ksFile, String ksPasswordFile, String tsFile, String tsPasswordFile, - boolean mutualAuth, String[] ciphers, String[] protocols) throws Exception { - try { - this.sslContext = SslContextConstructor.constructSslContext(true, - ksFile, ksPasswordFile, tsFile, tsPasswordFile); - } catch (SSLException e) { - throw new RuntimeException(e); - } - this.tlsMutualAuthEnabled = mutualAuth; - this.enabledTlsCipherSuites = ciphers; - this.enabledTlsProtocols = protocols; - this.tlsEnabled = true; - } + final ServerContext serverContext; - public void enableSaslPlainTextAuth() { - this.saslPlainTextAuthEnabled = true; + public NettyServerData(@Nonnull ServerContext context) { + this.serverContext = context; } void bootstrapServer() throws Exception { - NettyServerRouter nsr = new NettyServerRouter(new ImmutableMap.Builder().build()); - bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { - final AtomicInteger threadNum = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("accept-" + threadNum.getAndIncrement()); - return t; - } - }); - - workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new ThreadFactory() { - final AtomicInteger threadNum = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("io-" + threadNum.getAndIncrement()); - return t; - } - }); - - ee = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors() * 2, new ThreadFactory() { - - final AtomicInteger threadNum = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("event-" + threadNum.getAndIncrement()); - return t; - } - }); - - final int SO_BACKLOG = 100; - final int FRAME_SIZE = 4; - b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, SO_BACKLOG) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childOption(ChannelOption.SO_REUSEADDR, true) - .childOption(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception { - if (tlsEnabled) { - SSLEngine engine = sslContext.newEngine(ch.alloc()); - engine.setEnabledCipherSuites(enabledTlsCipherSuites); - engine.setEnabledProtocols(enabledTlsProtocols); - if (tlsMutualAuthEnabled) { - engine.setNeedClientAuth(true); - } - ch.pipeline().addLast("ssl", new SslHandler(engine)); - } - ch.pipeline().addLast(new LengthFieldPrepender(FRAME_SIZE)); - ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, FRAME_SIZE, 0, FRAME_SIZE)); - if (saslPlainTextAuthEnabled) { - ch.pipeline().addLast("sasl/plain-text", new PlainTextSaslNettyServer()); - } - ch.pipeline().addLast(ee, new NettyCorfuMessageDecoder()); - ch.pipeline().addLast(ee, new NettyCorfuMessageEncoder()); - ch.pipeline().addLast(ee, nsr); - } - }); - f = b.bind(port).sync(); + NettyServerRouter nsr = + new NettyServerRouter(Collections.singletonList(new BaseServer(serverContext))); + + bossGroup = CorfuServer.getBossGroup(serverContext); + workerGroup = CorfuServer.getWorkerGroup(serverContext); + f = CorfuServer.startAndListen(bossGroup, + workerGroup, + NioServerSocketChannel.class, + b -> CorfuServer.configureBootstrapOptions( + serverContext, b), + serverContext, + nsr, + serverContext.getServerConfig(Integer.class, + "")); } public void shutdownServer() { diff --git a/test/src/test/java/org/corfudb/runtime/clients/TestClientRouterTest.java b/test/src/test/java/org/corfudb/runtime/clients/TestClientRouterTest.java index ae1fa037059..b7a3763fd1f 100644 --- a/test/src/test/java/org/corfudb/runtime/clients/TestClientRouterTest.java +++ b/test/src/test/java/org/corfudb/runtime/clients/TestClientRouterTest.java @@ -2,6 +2,7 @@ import org.corfudb.AbstractCorfuTest; import org.corfudb.infrastructure.BaseServer; +import org.corfudb.infrastructure.ServerContextBuilder; import org.corfudb.infrastructure.TestServerRouter; import org.corfudb.protocols.wireprotocol.CorfuMsgType; import org.junit.Test; @@ -19,7 +20,7 @@ public class TestClientRouterTest extends AbstractCorfuTest { @Test public void testRuleDropsMessages() { TestServerRouter tsr = new TestServerRouter(); - BaseServer bs = new BaseServer(); + BaseServer bs = new BaseServer(ServerContextBuilder.defaultContext(0)); tsr.addServer(bs); TestClientRouter tcr = new TestClientRouter(tsr); @@ -40,7 +41,7 @@ public void testRuleDropsMessages() { @Test public void onlyDropEpochChangeMessages() { TestServerRouter tsr = new TestServerRouter(); - BaseServer bs = new BaseServer(); + BaseServer bs = new BaseServer(ServerContextBuilder.defaultContext(0)); tsr.addServer(bs); TestClientRouter tcr = new TestClientRouter(tsr); @@ -62,7 +63,7 @@ public void onlyDropEpochChangeMessages() { @Test public void doesNotUpdateEpochBackward() throws Exception { TestServerRouter tsr = new TestServerRouter(); - BaseServer bs = new BaseServer(); + BaseServer bs = new BaseServer(ServerContextBuilder.defaultContext(0)); tsr.addServer(bs); TestClientRouter tcr = new TestClientRouter(tsr); diff --git a/test/src/test/java/org/corfudb/runtime/view/AbstractViewTest.java b/test/src/test/java/org/corfudb/runtime/view/AbstractViewTest.java index c55cc9c08c4..c1e63bc935c 100644 --- a/test/src/test/java/org/corfudb/runtime/view/AbstractViewTest.java +++ b/test/src/test/java/org/corfudb/runtime/view/AbstractViewTest.java @@ -136,7 +136,9 @@ public void cleanupBuffers() { * @param config The configuration to use for the server. */ public void addServer(int port, Map config) { - addServer(port, new ServerContext(config, new TestServerRouter(port))); + ServerContext sc = new ServerContext(config); + sc.setServerRouter(new TestServerRouter(port)); + addServer(port, sc); } /** @@ -369,13 +371,17 @@ private static class TestServer { TestServer(Map optsMap) { - this(new ServerContext(optsMap, new TestServerRouter())); + this(new ServerContext(optsMap)); + serverContext.setServerRouter(new TestServerRouter()); } TestServer(ServerContext serverContext) { this.serverContext = serverContext; this.serverRouter = serverContext.getServerRouter(); - this.baseServer = new BaseServer(); + if (serverRouter == null) { + serverRouter = new TestServerRouter(); + } + this.baseServer = new BaseServer(serverContext); this.sequencerServer = new SequencerServer(serverContext); this.layoutServer = new LayoutServer(serverContext); this.logUnitServer = new LogUnitServer(serverContext); From b25e901f28b88c03fc754cafdd9eef18a310b808 Mon Sep 17 00:00:00 2001 From: Michael Wei Date: Mon, 11 Dec 2017 20:28:12 -0800 Subject: [PATCH 2/4] Missing file change --- .../src/test/java/org/corfudb/logReader/logReaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logReader/src/test/java/org/corfudb/logReader/logReaderTest.java b/logReader/src/test/java/org/corfudb/logReader/logReaderTest.java index a5a01d1879f..d33b9d3597c 100644 --- a/logReader/src/test/java/org/corfudb/logReader/logReaderTest.java +++ b/logReader/src/test/java/org/corfudb/logReader/logReaderTest.java @@ -35,7 +35,7 @@ private ServerContext getContext() { configs.put("--log-path", LOG_BASE_PATH); configs.put("--no-verify", false); configs .put("--cache-heap-ratio", "0.5"); - return new ServerContext(configs, null); + return new ServerContext(configs); } @Before From 61b7705d91d9b0c87bbb8bb559b175e6270d5181 Mon Sep 17 00:00:00 2001 From: Michael Wei Date: Mon, 11 Dec 2017 20:41:16 -0800 Subject: [PATCH 3/4] missing javadoc --- .../src/main/java/org/corfudb/infrastructure/BaseServer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java b/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java index 8d8aba2630f..53450005fd3 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/BaseServer.java @@ -23,8 +23,13 @@ @Slf4j public class BaseServer extends AbstractServer { + /** A {@link ServerContext} used to serve responses. */ final ServerContext serverContext; + /** Construct a new {@link BaseServer} given a {@link ServerContext}. + * + * @param context The {@link ServerContext} to use. + */ public BaseServer(@Nonnull ServerContext context) { this.serverContext = context; } From 10c5ea5cddc3149d11843ce573fbeb02e5f768c8 Mon Sep 17 00:00:00 2001 From: Michael Wei Date: Mon, 11 Dec 2017 20:56:07 -0800 Subject: [PATCH 4/4] update missing imports --- .../src/main/java/org/corfudb/infrastructure/CorfuServer.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java b/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java index 432f87cbc78..8e36581b655 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java @@ -18,8 +18,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; -import io.netty.channel.epoll.Epoll; -import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -33,14 +31,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.corfudb.protocols.wireprotocol.NettyCorfuMessageDecoder; import org.corfudb.protocols.wireprotocol.NettyCorfuMessageEncoder;