Skip to content

Commit

Permalink
Create new runtime for CompactorService (#3611)
Browse files Browse the repository at this point in the history
* Create new runtime for CompactorService

The CompactorService needs to register a systemDownHandler where the
runtime is reinitialized on an unrecoverable exception. Since the CompactorService
uses the same runtime as the other services part of the management server, we
need an independent runtime for CompactorService.

* Handle concurrency on systemDownHanlder in CompactorService

* Make systemDownHandlerForCompactor non-recursive and deadlock free

* Fix flaky CompactorServiceIT test
  • Loading branch information
SravanthiAshokKumar committed May 11, 2023
1 parent 4eca583 commit 86403de
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ public class CompactorBaseConfig {
public static final int DEFAULT_CP_MAX_WRITE_SIZE = 25 << 20;
public static final int SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT = 100; // Corfu default is 20
public static final int CORFU_LOG_CHECKPOINT_ERROR = 3;
private static final int SYSTEM_EXIT_ERROR_CODE = -3;
public static final String USAGE = "Usage: compactor-runner";
public static final String OPTIONS = "Options:\n";

private final Runnable defaultSystemDownHandler = () -> {
throw new UnreachableClusterException("Cluster is unavailable");
log.error("Exiting since the SystemDownHandler is invoked after " + SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT + " retries.");
System.exit(SYSTEM_EXIT_ERROR_CODE);
};

private Map<String, Object> opts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CompactorLeaderServices(CorfuRuntime corfuRuntime, String nodeEndpoint, C
this.nodeEndpoint = nodeEndpoint;
this.corfuStore = corfuStore;
this.livenessValidator = livenessValidator;
this.trimLog = new TrimLog(corfuRuntime, corfuStore);
this.trimLog = new TrimLog();
this.log = LoggerFactory.getLogger("compactor-leader");
}

Expand Down Expand Up @@ -268,7 +268,7 @@ private void deleteInstantKeyIfPresent() {
txn.delete(CompactorMetadataTables.COMPACTION_CONTROLS_TABLE, CompactorMetadataTables.INSTANT_TIGGER_WITH_TRIM);
txn.commit();
log.info("Invoking trimlog() due to InstantTrigger with trim found");
trimLog.invokePrefixTrim();
trimLog.invokePrefixTrim(corfuRuntime, corfuStore);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.exceptions.UnreachableClusterException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuError;
import org.corfudb.runtime.view.Layout;
import org.corfudb.util.LambdaUtils;
import org.corfudb.util.concurrent.SingletonResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;
Expand All @@ -35,36 +37,82 @@
public class CompactorService implements ManagementService {

private final ServerContext serverContext;
private final SingletonResource<CorfuRuntime> runtimeSingletonResource;
private final ScheduledExecutorService orchestratorThread;
private final Duration triggerInterval;
private final InvokeCheckpointing checkpointerJvmManager;
private final CompactionTriggerPolicy compactionTriggerPolicy;

private Optional<CompactorLeaderServices> optionalCompactorLeaderServices = Optional.empty();
private Optional<CorfuStore> optionalCorfuStore = Optional.empty();
private Optional<DistributedCheckpointerHelper> optionalDistributedCheckpointerHelper = Optional.empty();
private TrimLog trimLog;
private final ScheduledExecutorService orchestratorThread;
private final TrimLog trimLog;
private final CorfuRuntime.CorfuRuntimeParameters corfuRuntimeParameters;
private volatile Optional<CompactorLeaderServices> optionalCompactorLeaderServices = Optional.empty();
private volatile Optional<CorfuStore> optionalCorfuStore = Optional.empty();
private volatile Optional<DistributedCheckpointerHelper> optionalDistributedCheckpointerHelper = Optional.empty();
private volatile Optional<CorfuRuntime> corfuRuntimeOptional = Optional.empty();
private volatile ScheduledFuture<?> scheduledFuture;
private final Logger log;
private static final Duration LIVENESS_TIMEOUT = Duration.ofMinutes(1);
private static final int SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT = 60;

CompactorService(@NonNull ServerContext serverContext,
@NonNull SingletonResource<CorfuRuntime> runtimeSingletonResource,
public CompactorService(@NonNull ServerContext serverContext,
@NonNull Duration triggerInterval,
@NonNull InvokeCheckpointing checkpointerJvmManager,
@NonNull CompactionTriggerPolicy compactionTriggerPolicy) {
this.serverContext = serverContext;
this.runtimeSingletonResource = runtimeSingletonResource;

this.triggerInterval = triggerInterval;
this.checkpointerJvmManager = checkpointerJvmManager;
this.compactionTriggerPolicy = compactionTriggerPolicy;
this.orchestratorThread = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("Cmpt-" + serverContext.getServerConfig().get("<port>") + "-chkpter")
.build());
this.checkpointerJvmManager = checkpointerJvmManager;
this.compactionTriggerPolicy = compactionTriggerPolicy;
this.trimLog = new TrimLog();
this.corfuRuntimeParameters = serverContext.getManagementRuntimeParameters();
this.corfuRuntimeParameters.setSystemDownHandlerTriggerLimit(SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT);
this.log = LoggerFactory.getLogger("compactor-leader");
}

@VisibleForTesting
public Runnable getSystemDownHandlerForCompactor(CorfuRuntime runtime) {
return () -> {
log.warn("CorfuRuntime for CompactorService stalled. Invoking systemDownHandler after {} "
+ "unsuccessful tries.", SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT);
//Since start doesn't initiate a runtime, on concurrent calls to SystemDownHandler,
//the following if condition can possibly invoked by all calls. It's ok to have multiple
//calls to shutdown and start.
synchronized (this) {
if (!corfuRuntimeOptional.isPresent() || runtime == corfuRuntimeOptional.get()) {
shutdown();
start(this.triggerInterval);
}
}
throw new UnreachableClusterException("CorfuRuntime for CompactorService stalled. Invoked systemDownHandler after "
+ SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT + " unsuccessful tries.");
};
}

@VisibleForTesting
public CorfuRuntime getNewCorfuRuntime() {
final CorfuRuntime runtime = CorfuRuntime.fromParameters(this.corfuRuntimeParameters);
runtime.getParameters().setSystemDownHandler(getSystemDownHandlerForCompactor(runtime));
try {
final Layout managementLayout = serverContext.copyManagementLayout();
if (managementLayout != null) {
managementLayout.getLayoutServers().forEach(runtime::addLayoutServer);
}
runtime.connect();
} catch (UnrecoverableCorfuError er) {
log.error("Unable to connect to server due to UnrecoverableCorfuError: ", er);
runtime.getParameters().getSystemDownHandler().run();
throw er;
}
log.info("getNewCorfuRuntime: Corfu Runtime connected successfully");
return runtime;
}

private CorfuRuntime getCorfuRuntime() {
return runtimeSingletonResource.get();
if (!corfuRuntimeOptional.isPresent()) {
corfuRuntimeOptional = Optional.of(getNewCorfuRuntime());
}
return corfuRuntimeOptional.get();
}

/**
Expand All @@ -73,20 +121,25 @@ private CorfuRuntime getCorfuRuntime() {
* @param interval interval to run the service
*/
@Override
public void start(Duration interval) {
public synchronized void start(Duration interval) {
log.info("Starting Compaction service...");
if (getCorfuRuntime().getParameters().getCheckpointTriggerFreqMillis() <= 0) {
//Do not initialize runtime here - Initializing runtime here could cause a recursive call
//to systemDownHandlerForCompactor and also get the compactor thread into a deadlock scenario.
//This can happen if the connect() method in getNewCorfuRuntime(), invokes the
//systemDownHandlerForCompactor
if (this.corfuRuntimeParameters.getCheckpointTriggerFreqMillis() <= 0) {
log.warn("CheckpointTriggerFreqMillis should be > 0");
return;
}

this.trimLog = new TrimLog(getCorfuRuntime(), getCorfuStore());

orchestratorThread.scheduleWithFixedDelay(
() -> LambdaUtils.runSansThrow(this::runOrchestrator),
interval.toMillis(),
interval.toMillis(),
TimeUnit.MILLISECONDS
);
if (scheduledFuture == null || scheduledFuture.isDone() || scheduledFuture.isCancelled()) {
scheduledFuture = orchestratorThread.scheduleWithFixedDelay(
() -> LambdaUtils.runSansThrow(this::runOrchestrator),
interval.toMillis(),
interval.toMillis(),
TimeUnit.MILLISECONDS
);
}
HealthMonitor.resolveIssue(Issue.createInitIssue(Component.COMPACTOR));
}

Expand Down Expand Up @@ -162,8 +215,8 @@ private void runOrchestrator() {
compactorLeaderServices.validateLiveness();
}
} else if (compactionTriggerPolicy.shouldTrigger(
getCorfuRuntime().getParameters().getCheckpointTriggerFreqMillis(), getCorfuStore())) {
trimLog.invokePrefixTrim();
this.corfuRuntimeParameters.getCheckpointTriggerFreqMillis(), getCorfuStore())) {
trimLog.invokePrefixTrim(getCorfuRuntime(), getCorfuStore());
compactionTriggerPolicy.markCompactionCycleStart();
compactorLeaderServices.initCompactionCycle();
}
Expand All @@ -179,8 +232,8 @@ private void runOrchestrator() {
} catch (Exception ex) {
log.error("Exception in runOrchestrator(): ", ex);
} catch (Throwable t) {
log.error("Encountered unexpected exception in runOrchestrator(): ", t);
throw t;
log.error("Unexpected throwable encountered in runOrchestrator(): ", t);
getCorfuRuntime().getParameters().getSystemDownHandler().run();
}
}

Expand All @@ -198,9 +251,18 @@ private boolean isNodePrimarySequencer(Layout layout) {
* Clean up.
*/
@Override
public void shutdown() {
public synchronized void shutdown() {
checkpointerJvmManager.shutdown();
orchestratorThread.shutdownNow();
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
if (corfuRuntimeOptional.isPresent()) {
corfuRuntimeOptional.get().shutdown();
corfuRuntimeOptional = Optional.empty();
}
optionalCorfuStore = Optional.empty();
optionalCompactorLeaderServices = Optional.empty();
optionalDistributedCheckpointerHelper = Optional.empty();
log.info("Compactor Orchestrator service shutting down.");
HealthMonitor.reportIssue(Issue.createInitIssue(Component.COMPACTOR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public class ManagementAgent {
HealthMonitor.reportIssue(Issue.createInitIssue(Component.COMPACTOR));
}

this.compactorService = new CompactorService(serverContext, runtimeSingletonResource,
this.compactorService = new CompactorService(serverContext, TRIGGER_INTERVAL,
new InvokeCheckpointingJvm(serverContext), new DynamicTriggerPolicy());

// Creating the initialization task thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@
import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;

public class TrimLog {
private final CorfuRuntime corfuRuntime;
private final CorfuStore corfuStore;
private final Logger log;

TrimLog(CorfuRuntime corfuRuntime, CorfuStore corfuStore) {
this.corfuStore = corfuStore;
this.corfuRuntime = corfuRuntime;
TrimLog() {
this.log = LoggerFactory.getLogger("compactor-leader");
}

private Optional<Long> getTrimAddress() {
private Optional<Long> getTrimAddress(CorfuStore corfuStore) {
Optional<Long> trimAddress = Optional.empty();
try (TxnContext txn = corfuStore.txn(CORFU_SYSTEM_NAMESPACE)) {
CheckpointingStatus managerStatus = (CheckpointingStatus) txn.getRecord(
Expand All @@ -49,8 +45,8 @@ private Optional<Long> getTrimAddress() {
/**
* Perform log-trimming on CorfuDB
*/
public void invokePrefixTrim() {
Optional<Long> trimAddress = getTrimAddress();
public void invokePrefixTrim(CorfuRuntime corfuRuntime, CorfuStore corfuStore) {
Optional<Long> trimAddress = getTrimAddress(corfuStore);
if (!trimAddress.isPresent()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private Table<TableName, ActiveCPStreamMsg, Message> getActiveCheckpointsTable()
@Override
public void start() {
executorService.scheduleWithFixedDelay(
() -> LambdaUtils.runSansThrow(this::updateHeartbeat),
this::updateHeartbeat,
UPDATE_INTERVAL.toMillis() / 2,
UPDATE_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -86,6 +86,8 @@ protected void updateHeartbeat() {
}
} catch (Exception e) {
log.warn("Unable to update liveness for table: {} due to exception: {}", table, e.getStackTrace());
} catch (Throwable t) {
corfuStore.getRuntime().getParameters().getSystemDownHandler().run();
}
}

Expand Down
4 changes: 3 additions & 1 deletion runtime/src/main/java/org/corfudb/runtime/CorfuRuntime.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.corfudb.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.Timer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -1139,7 +1140,8 @@ public synchronized CompletableFuture<Layout> invalidateLayout() {
* @param layout The layout to check.
* @throws WrongClusterException If the layout belongs to the wrong cluster.
*/
private void checkClusterId(@Nonnull Layout layout) {
@VisibleForTesting
public void checkClusterId(@Nonnull Layout layout) {
// We haven't adopted a clusterId yet.
if (clusterId == null) {
clusterId = layout.getClusterId();
Expand Down

0 comments on commit 86403de

Please sign in to comment.