Skip to content

Commit

Permalink
Correctly seal servers on epoch change (#1545)
Browse files Browse the repository at this point in the history
* Seal and flush on epoch change.

* Correctly seal servers and flush operations on epoch change
  • Loading branch information
WenbinZhu authored and Maithem committed Nov 13, 2018
1 parent 03eb8e1 commit 2bdbfe0
Show file tree
Hide file tree
Showing 25 changed files with 292 additions and 131 deletions.
2 changes: 1 addition & 1 deletion corfu_scripts/corfu_layouts.clj
Expand Up @@ -34,7 +34,7 @@ Options:
[layout]
; For now, we start at rank 0, but we really should get the highest rank proposed
(.setEpoch layout (inc (.getEpoch layout)))
(.moveServersToEpoch layout)
(.sealMinServerSet layout)
(loop [layout-rank 0]
(when (> layout-rank -1)
(do
Expand Down
Expand Up @@ -28,11 +28,22 @@ public AbstractServer() {
shutdown = false;
}

/** Get the message handler for this instance.
* @return A message handler.
/**
* Get the message handler for this instance.
*
* @return A message handler.
*/
public abstract CorfuMsgHandler getHandler();

/**
* Seal the server with the epoch.
*
* @param epoch Epoch to seal with
*/
public void sealServerWithEpoch(long epoch) {
// Overridden in log unit to flush operations stamped with an old epoch
}

public boolean isServerReadyToHandleMsg(CorfuMsg msg) {
// Overridden in sequencer to mark ready/not-ready state.
return true;
Expand Down
Expand Up @@ -31,8 +31,8 @@ public BaseServer(@Nonnull ServerContext context) {
private final CorfuMsgHandler handler =
CorfuMsgHandler.generateHandler(MethodHandles.lookup(), this);


/** Respond to a ping message.
/**
* Respond to a ping message.
*
* @param msg The incoming message
* @param ctx The channel context
Expand All @@ -43,7 +43,8 @@ private static void ping(CorfuMsg msg, ChannelHandlerContext ctx, IServerRouter
r.sendResponse(ctx, msg, CorfuMsgType.PONG.msg());
}

/** Respond to a version request message.
/**
* Respond to a version request message.
*
* @param msg The incoming message
* @param ctx The channel context
Expand All @@ -58,6 +59,7 @@ private void getVersion(CorfuMsg msg, ChannelHandlerContext ctx, IServerRouter r

/**
* Respond to a epoch change message.
* This method also executes sealing logic on each individual server type.
*
* @param msg The incoming message
* @param ctx The channel context
Expand All @@ -68,9 +70,10 @@ public synchronized void handleMessageSetEpoch(@NonNull CorfuPayloadMsg<Long> ms
ChannelHandlerContext ctx,
@NonNull IServerRouter r) {
try {
log.info("handleMessageSetEpoch: Received SET_EPOCH, moving to new epoch {}",
msg.getPayload());
serverContext.setServerEpoch(msg.getPayload(), r);
long epoch = msg.getPayload();
log.info("handleMessageSetEpoch: Received SET_EPOCH, moving to new epoch {}", epoch);
serverContext.setServerEpoch(epoch, r);
serverContext.getServers().forEach(s -> s.sealServerWithEpoch(epoch));
r.sendResponse(ctx, msg, new CorfuMsg(CorfuMsgType.ACK));
} catch (WrongEpochException e) {
log.debug("handleMessageSetEpoch: Rejected SET_EPOCH current={}, requested={}",
Expand All @@ -80,7 +83,8 @@ public synchronized void handleMessageSetEpoch(@NonNull CorfuPayloadMsg<Long> ms
}
}

/** Reset the JVM. This mechanism leverages that corfu_server runs in a bash script
/**
* Reset the JVM. This mechanism leverages that corfu_server runs in a bash script
* which monitors the exit code of Corfu. If the exit code is 100, then it resets
* the server and DELETES ALL EXISTING DATA.
*
Expand All @@ -95,7 +99,8 @@ private void doReset(CorfuMsg msg, ChannelHandlerContext ctx, IServerRouter r) {
CorfuServer.restartServer(serverContext, true);
}

/** Restart the JVM. This mechanism leverages that corfu_server runs in a bash script
/**
* Restart the JVM. This mechanism leverages that corfu_server runs in a bash script
* which monitors the exit code of Corfu. If the exit code is 200, then it restarts
* the server.
*
Expand Down
Expand Up @@ -45,24 +45,22 @@ public class BatchWriter<K, V> implements CacheWriter<K, V>, AutoCloseable {
.build());

/**
* The epochWaterMark is the epoch up to which all operations have been sealed. Any
* BatchWriterOperation arriving after the epochWaterMark with an epoch less than the epochWaterMark, is
* completed exceptionally with a WrongEpochException.
* The sealEpoch is the epoch up to which all operations have been sealed. Any
* BatchWriterOperation arriving after the sealEpoch with an epoch less than the sealEpoch
* is completed exceptionally with a WrongEpochException.
* This is persisted in the ServerContext by the LogUnitServer to withstand restarts.
*/
private volatile long epochWaterMark;
private long sealEpoch;

/**
* Returns a new BatchWriter for a stream log.
*
* @param streamLog stream log for writes (can be in memory or file)
* @param epochWaterMark All operations stamped with epoch less than the epochWaterMark are
* discarded.
* @param streamLog stream log for writes (can be in memory or file)
* @param doSync If true, the batch writer will sync writes to secondary storage
* @param streamLog Stream log for writes (can be in memory or file)
* @param sealEpoch All operations stamped with epoch less than the sealEpoch are discarded
* @param doSync If true, the batch writer will sync writes to secondary storage
*/
public BatchWriter(StreamLog streamLog, long epochWaterMark, boolean doSync) {
this.epochWaterMark = epochWaterMark;
public BatchWriter(StreamLog streamLog, long sealEpoch, boolean doSync) {
this.sealEpoch = sealEpoch;
this.doSync = doSync;
this.streamLog = streamLog;
operationsQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -136,22 +134,17 @@ public void prefixTrim(@Nonnull long address, @Nonnull long epoch) {
}

/**
* Insert epochWaterMark in queue and wait for queue to process all preceding operations.
* All operations in the queue after the epochWaterMark operation, that have epoch less than
* the epochWaterMark epoch are discarded and their futures are completed exceptionally with a
* WrongEpochException. The epochWaterMark is used in case of:
* Reset - All operations need to be flushed before a reset and no new operations for the
* previous epoch should be accepted.
* SealAndFlush - Similarly on a seal, all operations need to be flushed before a seal and no
* new operations for the previous epoch should be accepted.
* Insert seal operation in queue and wait for queue to process all preceding operations.
* All operations in the queue after the sealEpoch operation, that have epoch less than
* the sealEpoch epoch are discarded and their futures are completed exceptionally with a
* WrongEpochException.
*
* @param epoch Epoch to epochWaterMark with.
* @param epoch Epoch to seal with.
*/
public void waitForEpochWaterMark(@Nonnull long epoch) {
public void waitForSealComplete(long epoch) {
try {
CompletableFuture<Void> cf = new CompletableFuture<>();
epochWaterMark = epoch;
operationsQueue.add(new BatchWriterOperation(Type.EPOCH_WATER_MARK, null, null, epoch, null, cf));
operationsQueue.add(new BatchWriterOperation(Type.SEAL, null, null, epoch, null, cf));
cf.get();
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -192,7 +185,7 @@ private void batchWriteProcessor() {
try {
BatchWriterOperation lastOp = null;
int processed = 0;
List<BatchWriterOperation> res = new LinkedList();
List<BatchWriterOperation> res = new LinkedList<>();

while (true) {
BatchWriterOperation currOp;
Expand All @@ -217,13 +210,12 @@ private void batchWriteProcessor() {

if (currOp == null) {
lastOp = null;
continue;
} else if (currOp == BatchWriterOperation.SHUTDOWN) {
log.trace("Shutting down the write processor");
streamLog.sync(true);
break;
} else if (currOp.getEpoch() < epochWaterMark) {
currOp.setException(new WrongEpochException(epochWaterMark));
} else if (currOp.getEpoch() < sealEpoch) {
currOp.setException(new WrongEpochException(sealEpoch));
res.add(currOp);
processed++;
lastOp = currOp;
Expand All @@ -246,7 +238,8 @@ private void batchWriteProcessor() {
streamLog.append(currOp.getEntries());
res.add(currOp);
break;
case EPOCH_WATER_MARK:
case SEAL:
sealEpoch = currOp.getEpoch();
res.add(currOp);
break;
case RESET:
Expand Down
Expand Up @@ -19,7 +19,7 @@ public enum Type {
RANGE_WRITE,
TRIM,
PREFIX_TRIM,
EPOCH_WATER_MARK,
SEAL,
RESET
}

Expand Down
Expand Up @@ -284,7 +284,7 @@ public static void startServer(Map<String, Object> opts, boolean bindToAllInterf
serverContext.setBindToAllInterfaces(bindToAllInterfaces);

// Register shutdown handler
shutdownThread = new Thread(() -> cleanShutdown(router));
shutdownThread = new Thread(() -> cleanShutdown(servers));
shutdownThread.setName("ShutdownThread");
Runtime.getRuntime().addShutdownHook(shutdownThread);

Expand Down Expand Up @@ -487,7 +487,7 @@ public static void restartServer(ServerContext serverContext, boolean resetData)
final boolean bindToAllInterfaces = serverContext.isBindToAllInterfaces();

corfuServerThread = new Thread(() -> {
cleanShutdown((NettyServerRouter) serverContext.getServerRouter());
cleanShutdown(serverContext.getServers());
if (resetData && !(Boolean) serverContext.getServerConfig().get("--memory")) {
File serviceDir = new File((String) serverContext.getServerConfig()
.get("--log-path"));
Expand Down Expand Up @@ -521,10 +521,8 @@ public static void restartServer(ServerContext serverContext, boolean resetData)
/**
* Attempt to cleanly shutdown all the servers.
*/
public static void cleanShutdown(@Nonnull NettyServerRouter router) {
public static void cleanShutdown(@Nonnull List<AbstractServer> servers) {
log.info("CleanShutdown: Starting Cleanup.");
// Create a list of servers
final List<AbstractServer> servers = router.getServers();

// A executor service to create the shutdown threads
// plus name the threads correctly.
Expand Down
Expand Up @@ -3,6 +3,8 @@
import io.netty.channel.ChannelHandlerContext;
import org.corfudb.protocols.wireprotocol.CorfuMsg;

import java.util.List;

/**
* Created by mwei on 12/13/15.
*/
Expand All @@ -24,4 +26,9 @@ public interface IServerRouter {
* @param server The server to route messages to
*/
void addServer(AbstractServer server);

/**
* Get a list of registered servers.
*/
List<AbstractServer> getServers();
}
Expand Up @@ -113,10 +113,9 @@ public LogUnitServer(ServerContext serverContext) {
streamLog = new StreamLogFiles(serverContext, (Boolean) opts.get("--no-verify"));
}


batchWriter = new BatchWriter<>(
streamLog,
serverContext.getLogUnitEpochWaterMark(),
serverContext.getServerEpoch(),
!((Boolean) opts.get("--no-sync"))
);

Expand Down Expand Up @@ -285,25 +284,37 @@ private void rangeWrite(CorfuPayloadMsg<RangeWriteMsg> msg,
r.sendResponse(ctx, msg, CorfuMsgType.WRITE_OK.msg());
}

/**
* Seal the server with the epoch.
*
* - A seal operation is inserted in the queue and then we wait to flush all operations
* in the queue before this operation.
* - All operations after this operation but stamped with an older epoch will be failed.
*/
@Override
public void sealServerWithEpoch(long epoch) {
batchWriter.waitForSealComplete(epoch);
log.info("LogUnit sealServerWithEpoch: sealed and flushed with epoch {}", epoch);
}

/**
* Resets the log unit server via the BatchWriter.
* Warning: Clears all data.
* - The epochWaterMark is persisted to withstand restarts.
* - The epochWaterMark is inserted in the queue and then we wait to flush all operations in
* the queue before this operation.
*
* - The epochWaterMark is set to prevent resetting log unit multiple times during
* same epoch.
* - After this the reset operation is inserted which resets and clears all data.
* - Finally the cache is invalidated to purge the existing entries.
*/
@ServerHandler(type = CorfuMsgType.RESET_LOGUNIT)
private synchronized void resetLogUnit(CorfuPayloadMsg<Long> msg,
ChannelHandlerContext ctx, IServerRouter r) {
// Check if the reset request is with an epoch greater than the last reset epoch seen.
// and should be equal to the current router epoch to prevent stale reset requests from
// wiping out the data.
// Check if the reset request is with an epoch greater than the last reset epoch seen to
// prevent multiple reset in the same epoch. and should be equal to the current router
// epoch to prevent stale reset requests from wiping out the data.
if (msg.getPayload() > serverContext.getLogUnitEpochWaterMark()
&& msg.getPayload() == serverContext.getServerEpoch()) {
serverContext.setLogUnitEpochWaterMark(msg.getPayload());
batchWriter.waitForEpochWaterMark(msg.getPayload());
batchWriter.reset(msg.getPayload());
dataCache.invalidateAll();
log.info("LogUnit Server Reset.");
Expand All @@ -329,7 +340,6 @@ public synchronized ILogData handleRetrieval(long address) {
return entry;
}


public synchronized void handleEviction(long address, ILogData entry, RemovalCause cause) {
log.trace("Eviction[{}]: {}", address, cause);
streamLog.release(address, (LogData) entry);
Expand Down
Expand Up @@ -725,7 +725,7 @@ private void correctOutOfPhaseEpochs(PollReport pollReport) {

// Re-seal all servers with the latestLayout epoch.
// This has no effect on up-to-date servers. Only the trailing servers are caught up.
getCorfuRuntime().getLayoutView().getRuntimeLayout(layout).moveServersToEpoch();
getCorfuRuntime().getLayoutView().getRuntimeLayout(layout).sealMinServerSet();

// Check if any layout server has a stale layout.
// If yes patch it (commit) with the latestLayout (received from quorum).
Expand Down
@@ -1,9 +1,11 @@
package org.corfudb.infrastructure;

import com.google.common.collect.ImmutableList;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
Expand All @@ -24,9 +26,7 @@
*/
@Slf4j
@ChannelHandler.Sharable
public class NettyServerRouter extends ChannelInboundHandlerAdapter
implements IServerRouter {

public class NettyServerRouter extends ChannelInboundHandlerAdapter implements IServerRouter {

/**
* This map stores the mapping from message type to netty server handler.
Expand All @@ -41,7 +41,6 @@ public class NettyServerRouter extends ChannelInboundHandlerAdapter
volatile long serverEpoch;

/** The {@link AbstractServer}s this {@link NettyServerRouter} routes messages for. */
@Getter
final List<AbstractServer> servers;

/** Construct a new {@link NettyServerRouter}.
Expand All @@ -52,7 +51,7 @@ public class NettyServerRouter extends ChannelInboundHandlerAdapter
public NettyServerRouter(List<AbstractServer> servers) {
// Initialize the router epoch from the persisted server epoch
this.serverEpoch = ((BaseServer) servers.get(0)).serverContext.getServerEpoch();
this.servers = servers;
this.servers = ImmutableList.copyOf(servers);
handlerMap = new EnumMap<>(CorfuMsgType.class);
servers.forEach(server -> server.getHandler().getHandledTypes()
.forEach(x -> handlerMap.put(x, server)));
Expand All @@ -70,6 +69,14 @@ public void addServer(AbstractServer server) {
throw new UnsupportedOperationException("No longer supported");
}

/**
* {@inheritDoc}
*/
@Override
public List<AbstractServer> getServers() {
return servers;
}

/**
* Send a netty message through this router, setting the fields in the outgoing message.
*
Expand Down Expand Up @@ -125,8 +132,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (validateEpoch(m, ctx)) {
// Route the message to the handler.
if (log.isTraceEnabled()) {
log.trace("Message routed to {}: {}", handler.getClass().getSimpleName(),
msg);
log.trace("Message routed to {}: {}", handler.getClass().getSimpleName(), msg);
}

handler.getExecutor().submit(() -> {
Expand Down

0 comments on commit 2bdbfe0

Please sign in to comment.