Skip to content

Commit

Permalink
Keep trying to generate nodeId in case of DataCorruptionException (#3580
Browse files Browse the repository at this point in the history
)
  • Loading branch information
xnull committed Apr 5, 2023
1 parent de580ef commit 844e4a8
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.corfudb.common.util.URLUtils.NetworkInterfaceVersion;
import org.corfudb.infrastructure.health.HealthMonitor;
import org.corfudb.infrastructure.logreplication.infrastructure.CorfuInterClusterReplicationServer;
import org.corfudb.runtime.exceptions.DataCorruptionException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuError;
import org.corfudb.util.GitRepositoryState;
import org.docopt.Docopt;
Expand All @@ -21,6 +22,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.corfudb.util.NetworkUtils.getAddressFromInterfaceName;

Expand Down Expand Up @@ -217,6 +219,8 @@ public class CorfuServer {
+ " --version "
+ " Show version\n";

private static final Duration TIMEOUT = Duration.ofSeconds(3);

// Active Corfu Server.
private static volatile CorfuServerNode activeServer;

Expand All @@ -240,7 +244,7 @@ public class CorfuServer {
*
* @param args command line argument strings
*/
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
try {
// Parse the options given, using docopt.
Map<String, Object> opts = new Docopt(USAGE)
Expand Down Expand Up @@ -286,7 +290,7 @@ public static void configureHealthMonitor(Map<String, Object> opts) {
}
}

private static void startServer(Map<String, Object> opts) {
private static void startServer(Map<String, Object> opts) throws InterruptedException {

// Print a nice welcome message.
printStartupMsg(opts);
Expand All @@ -296,6 +300,62 @@ private static void startServer(Map<String, Object> opts) {

// Bind to all interfaces only if no address or interface specified by the user.
// Fetch the address if given a network interface.
configureNetwork(opts);

log.info("Configured Corfu Server address: {}", opts.get(ADDRESS_PARAM));
createServiceDirectory(opts);
checkMetaDataRetention(opts);
registerShutdownHandler();

// Manages the lifecycle of the Corfu Server.
while (!shutdownServer) {
ServerContext serverContext;
try {
serverContext = new ServerContext(opts);
} catch (DataCorruptionException ex) {
log.error("Failed creating server context", ex);
TimeUnit.SECONDS.sleep(TIMEOUT.getSeconds());
continue;
}

try {
configureMetrics(opts, serverContext.getLocalEndpoint());
configureHealthMonitor(opts);
activeServer = new CorfuServerNode(serverContext);
activeServer.startAndListen();
} catch (Throwable th) {
log.error("CorfuServer: Server exiting due to unrecoverable error: ", th);
System.exit(EXIT_ERROR_CODE);
}

if (cleanupServer) {
clearDataFiles(serverContext);
cleanupServer = false;
}

if (!shutdownServer) {
log.info("main: Server restarting.");
}
}

log.info("main: Server exiting due to shutdown");
}

private static void registerShutdownHandler() {
// Register shutdown handler
Thread shutdownThread = new Thread(CorfuServer::cleanShutdown);
shutdownThread.setName("ShutdownThread");
Runtime.getRuntime().addShutdownHook(shutdownThread);
}

private static void checkMetaDataRetention(Map<String, Object> opts) {
// Check the specified number of datastore files to retain
if (Integer.parseInt((String) opts.get("--metadata-retention")) < 1) {
throw new IllegalArgumentException("Max number of metadata files to retain must be greater than 0.");
}
}

private static void configureNetwork(Map<String, Object> opts) {
if (opts.get("--network-interface") != null) {
opts.put(
ADDRESS_PARAM,
Expand Down Expand Up @@ -323,44 +383,6 @@ private static void startServer(Map<String, Object> opts) {
// Address is specified by the user.
opts.put("--bind-to-all-interfaces", false);
}
log.info("Configured Corfu Server address: {}", opts.get(ADDRESS_PARAM));

createServiceDirectory(opts);

// Check the specified number of datastore files to retain
if (Integer.parseInt((String) opts.get("--metadata-retention")) < 1) {
throw new IllegalArgumentException("Max number of metadata files to retain must be greater than 0.");
}

// Register shutdown handler
Thread shutdownThread = new Thread(CorfuServer::cleanShutdown);
shutdownThread.setName("ShutdownThread");
Runtime.getRuntime().addShutdownHook(shutdownThread);

// Manages the lifecycle of the Corfu Server.
while (!shutdownServer) {
final ServerContext serverContext = new ServerContext(opts);
try {
configureMetrics(opts, serverContext.getLocalEndpoint());
configureHealthMonitor(opts);
activeServer = new CorfuServerNode(serverContext);
activeServer.startAndListen();
} catch (Throwable th) {
log.error("CorfuServer: Server exiting due to unrecoverable error: ", th);
System.exit(EXIT_ERROR_CODE);
}

if (cleanupServer) {
clearDataFiles(serverContext);
cleanupServer = false;
}

if (!shutdownServer) {
log.info("main: Server restarting.");
}
}

log.info("main: Server exiting due to shutdown");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.config.ConfigParamNames;
import org.corfudb.common.metrics.micrometer.MeterRegistryProvider;
import org.corfudb.infrastructure.ManagementServer.ManagementServerInitializer;
import org.corfudb.infrastructure.health.HealthMonitor;
import org.corfudb.infrastructure.health.HttpServerInitializer;
import org.corfudb.protocols.wireprotocol.NettyCorfuMessageDecoder;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class CorfuServerNode implements AutoCloseable {
private final ServerContext serverContext;

@Getter
private final Map<Class, AbstractServer> serverMap;
private final Map<Class<?>, AbstractServer> serverMap;

@Getter
private final NettyServerRouter router;
Expand All @@ -73,15 +74,13 @@ public class CorfuServerNode implements AutoCloseable {
* @param serverContext Initialized Server Context.
*/
public CorfuServerNode(@Nonnull ServerContext serverContext) {
this(serverContext,
ImmutableMap.<Class, AbstractServer>builder()
.put(BaseServer.class, new BaseServer(serverContext))
.put(SequencerServer.class, new SequencerServer(serverContext))
.put(LayoutServer.class, new LayoutServer(serverContext))
.put(LogUnitServer.class, new LogUnitServer(serverContext))
.put(ManagementServer.class, new ManagementServer(serverContext,
new ManagementServer.ManagementServerInitializer()))
.build()
this(serverContext, ImmutableMap.<Class<?>, AbstractServer>builder()
.put(BaseServer.class, new BaseServer(serverContext))
.put(SequencerServer.class, new SequencerServer(serverContext))
.put(LayoutServer.class, new LayoutServer(serverContext))
.put(LogUnitServer.class, new LogUnitServer(serverContext))
.put(ManagementServer.class, new ManagementServer(serverContext, new ManagementServerInitializer()))
.build()
);
}

Expand All @@ -92,16 +91,15 @@ public CorfuServerNode(@Nonnull ServerContext serverContext) {
* @param serverMap Server Map with all components.
*/
public CorfuServerNode(@Nonnull ServerContext serverContext,
@Nonnull ImmutableMap<Class, AbstractServer> serverMap) {
@Nonnull ImmutableMap<Class<?>, AbstractServer> serverMap) {
this.serverContext = serverContext;
this.serverMap = serverMap;
router = new NettyServerRouter(serverMap.values().asList(), serverContext);
this.serverContext.setServerRouter(router);
// If the node is started in the single node setup and was bootstrapped,
// set the server epoch as well.
if (serverContext.isSingleNodeSetup() && serverContext.getCurrentLayout() != null) {
serverContext.setServerEpoch(serverContext.getCurrentLayout().getEpoch(),
router);
serverContext.setServerEpoch(serverContext.getCurrentLayout().getEpoch(), router);
}
this.close = new AtomicBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void delete(@Nonnull String key,
private <T> T load(Class<T> tClass, String key) {
try {
Path path = Paths.get(logDirPath, key + EXTENSION);
if (Files.notExists(path)) {
if (!Files.isReadable(path)) {
return null;
}
byte[] bytes = Files.readAllBytes(path);
Expand All @@ -192,7 +192,7 @@ private <T> T load(Class<T> tClass, String key) {
String json = new String(bytes, 4, bytes.length - 4);
return JsonUtils.parser.fromJson(json, tClass);
} catch (IOException e) {
throw new RuntimeException(e);
throw new DataCorruptionException(e);
}
}

Expand Down

0 comments on commit 844e4a8

Please sign in to comment.