Skip to content

Commit

Permalink
Handle exception thrown during init of CompactorService (#3555)
Browse files Browse the repository at this point in the history
When initializing optional variables in CompactorService, if there's any exception,
it's not handled and hence results in the CompactorService thread being killed. This
leads to the runOrchestrator not being invoked and so compactor never gets triggered.
This commit fixes the above issue.
  • Loading branch information
SravanthiAshokKumar committed Mar 17, 2023
1 parent 6f3a8af commit 8c52f90
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public class CompactorService implements ManagementService {
private final InvokeCheckpointing checkpointerJvmManager;
private final CompactionTriggerPolicy compactionTriggerPolicy;

private Optional<CompactorLeaderServices> compactorLeaderServices = Optional.empty();
private Optional<CorfuStore> corfuStore = Optional.empty();
private Optional<DistributedCheckpointerHelper> distributedCheckpointerHelper = Optional.empty();
private Optional<CompactorLeaderServices> optionalCompactorLeaderServices = Optional.empty();
private Optional<CorfuStore> optionalCorfuStore = Optional.empty();
private Optional<DistributedCheckpointerHelper> optionalDistributedCheckpointerHelper = Optional.empty();
private TrimLog trimLog;
private final Logger log;
private static final Duration LIVENESS_TIMEOUT = Duration.ofMinutes(1);
Expand Down Expand Up @@ -79,7 +79,6 @@ public void start(Duration interval) {
return;
}

getCompactorLeaderServices();
this.trimLog = new TrimLog(getCorfuRuntime(), getCorfuStore());

orchestratorThread.scheduleWithFixedDelay(
Expand All @@ -92,36 +91,38 @@ public void start(Duration interval) {
}

@VisibleForTesting
public CompactorLeaderServices getCompactorLeaderServices() {
if (!compactorLeaderServices.isPresent()) {
public CompactorLeaderServices getCompactorLeaderServices() throws Exception {
if (!optionalCompactorLeaderServices.isPresent()) {
try {
compactorLeaderServices = Optional.of(new CompactorLeaderServices(getCorfuRuntime(),
optionalCompactorLeaderServices = Optional.of(new CompactorLeaderServices(getCorfuRuntime(),
serverContext.getLocalEndpoint(), getCorfuStore(),
new LivenessValidator(getCorfuRuntime(), getCorfuStore(), LIVENESS_TIMEOUT)));
} catch (Exception e) {
log.error("Unable to create CompactorLeaderServices object. Will retry on next attempt. Exception: ", e);
} catch (Exception ex) {
log.error("Unable to create CompactorLeaderServices object. Will retry on next attempt. Exception: ", ex);
throw ex;
}
}
return compactorLeaderServices.get();
return optionalCompactorLeaderServices.get();
}

@VisibleForTesting
public CorfuStore getCorfuStore() {
if (!this.corfuStore.isPresent()) {
this.corfuStore = Optional.of(new CorfuStore(getCorfuRuntime()));
if (!this.optionalCorfuStore.isPresent()) {
this.optionalCorfuStore = Optional.of(new CorfuStore(getCorfuRuntime()));
}
return this.corfuStore.get();
return this.optionalCorfuStore.get();
}

private DistributedCheckpointerHelper getDistributedCheckpointerHelper() {
if (!distributedCheckpointerHelper.isPresent()) {
private DistributedCheckpointerHelper getDistributedCheckpointerHelper() throws Exception {
if (!optionalDistributedCheckpointerHelper.isPresent()) {
try {
distributedCheckpointerHelper = Optional.of(new DistributedCheckpointerHelper(getCorfuStore()));
} catch (Exception e) {
log.error("Failed to obtain a DistributedCheckpointerHelper. Will retry on next attempt. Exception: ", e);
optionalDistributedCheckpointerHelper = Optional.of(new DistributedCheckpointerHelper(getCorfuStore()));
} catch (Exception ex) {
log.error("Failed to obtain a DistributedCheckpointerHelper. Will retry on next attempt. Exception: ", ex);
throw ex;
}
}
return distributedCheckpointerHelper.get();
return optionalDistributedCheckpointerHelper.get();
}

/**
Expand All @@ -135,50 +136,50 @@ private void runOrchestrator() {
boolean isLeader = isNodePrimarySequencer(updateLayoutAndGet());
log.trace("Current node isLeader: {}", isLeader);

CompactorLeaderServices compactorLeaderServices = getCompactorLeaderServices();

CheckpointingStatus managerStatus = null;
try (TxnContext txn = getCorfuStore().txn(CORFU_SYSTEM_NAMESPACE)) {
managerStatus = (CheckpointingStatus) txn.getRecord(
CompactorMetadataTables.COMPACTION_MANAGER_TABLE_NAME,
CompactorMetadataTables.COMPACTION_MANAGER_KEY).getPayload();
if (managerStatus == null && isLeader) {
txn.putRecord(getCompactorLeaderServices().getCompactorMetadataTables().getCompactionManagerTable(),
txn.putRecord(compactorLeaderServices.getCompactorMetadataTables().getCompactionManagerTable(),
CompactorMetadataTables.COMPACTION_MANAGER_KEY,
CheckpointingStatus.newBuilder().setStatus(StatusType.IDLE).setCycleCount(0).build(), null);
}
txn.commit();
} catch (Exception e) {
log.error("Unable to acquire manager status: ", e);
} catch (Exception ex) {
log.error("Unable to acquire manager status: ", ex);
return;
}
try {
if (isLeader) {
if (managerStatus != null && managerStatus.getStatus() == StatusType.STARTED) {
if (getDistributedCheckpointerHelper().isCompactionDisabled()) {
log.info("Compaction has been disabled. Force finish compaction cycle as it already started");
getCompactorLeaderServices().finishCompactionCycle();
} else {
getCompactorLeaderServices().validateLiveness();
}
} else if (compactionTriggerPolicy.shouldTrigger(
getCorfuRuntime().getParameters().getCheckpointTriggerFreqMillis(), getCorfuStore())) {
trimLog.invokePrefixTrim();
compactionTriggerPolicy.markCompactionCycleStart();
getCompactorLeaderServices().initCompactionCycle();
if (isLeader) {
if (managerStatus != null && managerStatus.getStatus() == StatusType.STARTED) {
if (getDistributedCheckpointerHelper().isCompactionDisabled()) {
log.info("Compaction has been disabled. Force finish compaction cycle as it already started");
compactorLeaderServices.finishCompactionCycle();
} else {
compactorLeaderServices.validateLiveness();
}
} else if (compactionTriggerPolicy.shouldTrigger(
getCorfuRuntime().getParameters().getCheckpointTriggerFreqMillis(), getCorfuStore())) {
trimLog.invokePrefixTrim();
compactionTriggerPolicy.markCompactionCycleStart();
compactorLeaderServices.initCompactionCycle();
}
if (managerStatus != null) {
if (managerStatus.getStatus() == StatusType.FAILED || managerStatus.getStatus() == StatusType.COMPLETED) {
checkpointerJvmManager.shutdown();
} else if (managerStatus.getStatus() == StatusType.STARTED && !checkpointerJvmManager.isRunning()
&& !checkpointerJvmManager.isInvoked()) {
checkpointerJvmManager.invokeCheckpointing();
}
}
if (managerStatus != null) {
if (managerStatus.getStatus() == StatusType.FAILED || managerStatus.getStatus() == StatusType.COMPLETED) {
checkpointerJvmManager.shutdown();
} else if (managerStatus.getStatus() == StatusType.STARTED && !checkpointerJvmManager.isRunning()
&& !checkpointerJvmManager.isInvoked()) {
checkpointerJvmManager.invokeCheckpointing();
}
} catch (Exception ex) {
log.warn("Exception in runOrcestrator(): ", ex);
}
} catch (Exception ex) {
log.error("Exception in runOrchestrator(): ", ex);
} catch (Throwable t) {
log.error("Encountered unexpected exception in runOrchestrator: ", t);
log.error("Encountered unexpected exception in runOrchestrator(): ", t);
throw t;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class CompactorServiceUnitTest {


@Before
public void setup() {
public void setup() throws Exception {

Map<String, Object> map = new HashMap<>();
map.put("<port>", "port");
Expand Down Expand Up @@ -114,6 +114,32 @@ public void runOrchestratorNonLeaderTest() {
verify(invokeCheckpointingJvm).invokeCheckpointing();
}

@Test
public void runOrchestratorNonLeaderOnExceptionTest() throws Exception {
Layout mockLayout = mock(Layout.class);
when(corfuRuntime.invalidateLayout()).thenReturn(CompletableFuture.completedFuture(mockLayout));
//isLeader becomes false
when(mockLayout.getPrimarySequencer()).thenReturn(NODE_ENDPOINT + NODE_0);

when(compactorServiceSpy.getCompactorLeaderServices())
.thenThrow(new Exception("CompactorLeaderServices not initialized")).thenReturn(leaderServices);
when((CheckpointingStatus) corfuStoreCompactionManagerEntry.getPayload())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.FAILED).build())
.thenReturn(CheckpointingStatus.newBuilder().setStatus(StatusType.STARTED).build());
when(invokeCheckpointingJvm.isRunning()).thenReturn(false).thenReturn(true);
when(invokeCheckpointingJvm.isInvoked()).thenReturn(false).thenReturn(true);

compactorServiceSpy.start(Duration.ofSeconds(SCHEDULER_INTERVAL));
try {
TimeUnit.SECONDS.sleep(SLEEP_WAIT);
} catch (InterruptedException e) {
log.warn(SLEEP_INTERRUPTED_EXCEPTION_MSG, e);
}

verify(invokeCheckpointingJvm, times(1)).shutdown();
verify(invokeCheckpointingJvm).invokeCheckpointing();
}

@Test
public void runOrchestratorLeaderTest() throws Exception {
Layout mockLayout = mock(Layout.class);
Expand Down

0 comments on commit 8c52f90

Please sign in to comment.