Skip to content

Commit

Permalink
Merge 10c5ea5 into a4cac0c
Browse files Browse the repository at this point in the history
  • Loading branch information
no2chem committed Dec 12, 2017
2 parents a4cac0c + 10c5ea5 commit eb0ec63
Show file tree
Hide file tree
Showing 15 changed files with 420 additions and 435 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.HashMap;
import java.util.Map;

import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,10 +23,16 @@
@Slf4j
public class BaseServer extends AbstractServer {

/** Options map, if available. */
@Getter
@Setter
public Map<String, Object> optionsMap = new HashMap<>();
/** A {@link ServerContext} used to serve responses. */
final ServerContext serverContext;

/** Construct a new {@link BaseServer} given a {@link ServerContext}.
*
* @param context The {@link ServerContext} to use.
*/
public BaseServer(@Nonnull ServerContext context) {
this.serverContext = context;
}

/** Handler for the base server. */
@Getter
Expand Down Expand Up @@ -55,7 +62,7 @@ private static void ping(CorfuMsg msg, ChannelHandlerContext ctx,
@ServerHandler(type = CorfuMsgType.VERSION_REQUEST, opTimer = metricsPrefix + "version-request")
private void getVersion(CorfuMsg msg, ChannelHandlerContext ctx,
IServerRouter r, boolean isMetricsEnabled) {
VersionInfo vi = new VersionInfo(optionsMap);
VersionInfo vi = new VersionInfo(serverContext.getServerConfig());
r.sendResponse(ctx, msg, new JSONPayloadMsg<>(vi, CorfuMsgType.VERSION_RESPONSE));
}

Expand Down
379 changes: 201 additions & 178 deletions infrastructure/src/main/java/org/corfudb/infrastructure/CorfuServer.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public synchronized void handleMessageLayoutBootstrap(
msg.getPayload().getLayout(), msg);
setCurrentLayout(msg.getPayload().getLayout());
serverContext.setServerEpoch(getCurrentLayout().getEpoch());
r.setServerEpoch(getCurrentLayout().getEpoch());
//send a response that the bootstrap was successful.
r.sendResponse(ctx, msg, new CorfuMsg(CorfuMsgType.ACK));
} else {
Expand Down Expand Up @@ -207,7 +208,8 @@ public synchronized void handleMessageSetEpoch(@NonNull CorfuPayloadMsg<Long> ms
if (msg.getPayload() >= serverEpoch) {
log.info("handleMessageSetEpoch: Received SET_EPOCH, moving to new epoch {}",
msg.getPayload());
setServerEpoch(msg.getPayload());
serverContext.setServerEpoch(msg.getPayload());
r.setServerEpoch(msg.getPayload());
r.sendResponse(ctx, msg, new CorfuMsg(CorfuMsgType.ACK));
} else {
log.debug("handleMessageSetEpoch: Rejected SET_EPOCH current={}, requested={}",
Expand Down Expand Up @@ -364,7 +366,8 @@ public synchronized void handleMessageLayoutCommit(
}

setCurrentLayout(commitLayout);
setServerEpoch(msg.getPayload().getEpoch());
serverContext.setServerEpoch(msg.getPayload().getEpoch());
r.setServerEpoch(msg.getPayload().getEpoch());
r.sendResponse(ctx, msg, new CorfuMsg(CorfuMsgType.ACK));
}

Expand Down Expand Up @@ -409,10 +412,6 @@ public void setLayoutInHistory(Layout layout) {
.getEpoch()), layout);
}

private void setServerEpoch(long serverEpoch) {
serverContext.setServerEpoch(serverEpoch);
}

private long getServerEpoch() {
return serverContext.getServerEpoch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -86,9 +88,7 @@ protected static void handleUncaughtException(Thread t, @Nonnull Throwable e) {
/**
* This map stores the mapping from message type to netty server handler.
*/
Map<CorfuMsgType, AbstractServer> handlerMap;

BaseServer baseServer;
private final Map<CorfuMsgType, AbstractServer> handlerMap;

/**
* The epoch of this router. This is managed by the base server implementation.
Expand All @@ -97,41 +97,32 @@ protected static void handleUncaughtException(Thread t, @Nonnull Throwable e) {
@Setter
long serverEpoch;

/**
* Returns a new NettyServerRouter.
* @param opts map of options (FIXME: unused)
*/
public NettyServerRouter(Map<String, Object> opts) {
handlerMap = new ConcurrentHashMap<>();
baseServer = new BaseServer();
addServer(baseServer);
}
/** The {@link AbstractServer}s this {@link NettyServerRouter} routes messages for. */
@Getter
final List<AbstractServer> servers;

/**
* Add a new netty server handler to the router.
/** Construct an new {@link NettyServerRouter}.
*
* @param server The server to add.
* @param servers A list of {@link AbstractServer}s this router will route
* messages for.
*/
public void addServer(AbstractServer server) {
// Iterate through all types of CorfuMsgType, registering the handler
server.getHandler().getHandledTypes()
.forEach(x -> {
handlerMap.put(x, server);
log.trace("Registered {} to handle messages of type {}", server, x);
});
public NettyServerRouter(List<AbstractServer> servers) {
this.servers = servers;
handlerMap = new EnumMap<>(CorfuMsgType.class);
servers.forEach(server -> server.getHandler().getHandledTypes()
.forEach(x -> handlerMap.put(x, server)));
}

/**
* Remove a server from the router.
* @param server server to remove
* {@inheritDoc}
*
* <p>This operation is no longer supported. The router will only route messages for
* servers provided at construction time.
*/
public void removeServer(AbstractServer server) {
// Iterate through all types of CorfuMsgType, un-registering the handler
server.getHandler().getHandledTypes()
.forEach(x -> {
handlerMap.remove(x, server);
log.trace("Un-Registered {} to handle messages of type {}", server, x);
});
@Override
@Deprecated
public void addServer(AbstractServer server) {
throw new UnsupportedOperationException("No longer supported");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class ServerContext {
private final DataStore dataStore;

@Getter
@Setter
private IServerRouter serverRouter;

@Getter
Expand All @@ -68,12 +69,10 @@ public class ServerContext {
/**
* Returns a new ServerContext.
* @param serverConfig map of configuration strings to objects
* @param serverRouter server router
*/
public ServerContext(Map<String, Object> serverConfig, IServerRouter serverRouter) {
public ServerContext(Map<String, Object> serverConfig) {
this.serverConfig = serverConfig;
this.dataStore = new DataStore(serverConfig);
this.serverRouter = serverRouter;
this.failureDetectorPolicy = new PeriodicPollPolicy();
this.failureHandlerPolicy = new ConservativeFailureHandlerPolicy();

Expand All @@ -88,6 +87,18 @@ public ServerContext(Map<String, Object> serverConfig, IServerRouter serverRoute
}
}

/** Get a field from the server configuration map.
*
* @param type The type of the field.
* @param optionName The name of the option to retrieve.
* @param <T> The type of the field to return.
* @return The field with the give option name.
*/
@SuppressWarnings("unchecked")
public <T> T getServerConfig(Class<T> type, String optionName) {
return (T) getServerConfig().get(optionName);
}

/**
* The epoch of this router. This is managed by the base server implementation.
*/
Expand All @@ -102,9 +113,6 @@ public long getServerEpoch() {
*/
public void setServerEpoch(long serverEpoch) {
dataStore.put(Long.class, PREFIX_EPOCH, KEY_EPOCH, serverEpoch);
// Set the epoch in the router as well.
//TODO need to figure out if we can remove this redundancy
serverRouter.setServerEpoch(serverEpoch);
}

public long getTailSegment() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private ServerContext getContext() {
configs.put("--log-path", LOG_BASE_PATH);
configs.put("--no-verify", false);
configs .put("--cache-heap-ratio", "0.5");
return new ServerContext(configs, null);
return new ServerContext(configs);
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class BaseServerTest extends AbstractServerTest {
@Override
public AbstractServer getDefaultServer() {
if (bs == null) {
bs = new BaseServer();
bs = new BaseServer(ServerContextBuilder.defaultContext(0));
}
return bs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public ManagementServer getDefaultServer() {
.build();
// Required for management server to fetch layout.
router.addServer(new LayoutServer(serverContext));
router.addServer(new BaseServer());
router.addServer(new BaseServer(ServerContextBuilder.defaultContext(0)));
// Required to fetch global tails while handling failures.
router.addServer(new LogUnitServer(serverContext));
// Required for management server to bootstrap during initialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@ public class ServerContextBuilder {
boolean memory = true;
String logPath = null;
boolean noVerify = false;

boolean tlsEnabled = false;
boolean tlsMutualAuthEnabled = false;
String tlsProtocols = "";
String tlsCiphers = "";
String keystore = "";
String keystorePasswordFile = "";
boolean saslPlainTextAuth = false;
String truststore = "";
String truststorePasswordFile = "";

String cacheSizeHeapRatio = "0.5";
String address = "test";
int port = 9000;
Expand Down Expand Up @@ -49,12 +59,24 @@ public ServerContext build() {
.put("--address", address)
.put("--cache-heap-ratio", cacheSizeHeapRatio)
.put("--enable-tls", tlsEnabled)
.put("--enable-tls-mutual-auth", tlsMutualAuthEnabled)
.put("--tls-protocols", tlsProtocols)
.put("--tls-ciphers", tlsCiphers)
.put("--keystore", keystore)
.put("--keystore-password-file", keystorePasswordFile)
.put("--truststore", truststore)
.put("--truststore-password-file", truststorePasswordFile)
.put("--enable-sasl-plain-text-auth", saslPlainTextAuth)
.put("<port>", port);
return new ServerContext(builder.build(), serverRouter);
ServerContext sc = new ServerContext(builder.build());
sc.setServerRouter(serverRouter);
return sc;
}

public static ServerContext defaultContext(int port) {
return new ServerContextBuilder().setPort(port).build();
ServerContext sc = new ServerContextBuilder().setPort(port).build();
sc.setServerRouter(new TestServerRouter());
return sc;
}

public static ServerContext emptyContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableSet;
import org.corfudb.infrastructure.AbstractServer;
import org.corfudb.infrastructure.BaseServer;
import org.corfudb.infrastructure.ServerContextBuilder;
import org.corfudb.util.CFUtils;
import org.junit.Test;

Expand All @@ -18,7 +19,7 @@ public class BaseClientTest extends AbstractClientTest {
@Override
Set<AbstractServer> getServersForTest() {
return new ImmutableSet.Builder<AbstractServer>()
.add(new BaseServer())
.add(new BaseServer(ServerContextBuilder.defaultContext(0)))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Set<AbstractServer> getServersForTest() {
// Required for management server to be able to bootstrap the sequencer.
.add(new SequencerServer(serverContext))
.add(new LogUnitServer(serverContext))
.add(new BaseServer())
.add(new BaseServer(ServerContextBuilder.defaultContext(0)))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.corfudb.runtime.clients;

import org.corfudb.infrastructure.ServerContextBuilder;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -13,7 +14,7 @@ public void doesNotUpdateEpochBackward() throws Exception{

runWithBaseServer(
(port) -> {
return new NettyServerData(port);
return new NettyServerData(ServerContextBuilder.defaultContext(port));
},
(port) -> {
return new NettyClientRouter("localhost", port);
Expand Down

0 comments on commit eb0ec63

Please sign in to comment.