Skip to content

Commit

Permalink
Add Exception handler in CompactorService (#3456)
Browse files Browse the repository at this point in the history
Handle exception thrown while invoking DistributedCheckpointerHelper
in CompactorService
  • Loading branch information
SravanthiAshokKumar committed Dec 19, 2022
1 parent edda087 commit ce53a8a
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class CompactorService implements ManagementService {

private Optional<CompactorLeaderServices> compactorLeaderServices = Optional.empty();
private Optional<CorfuStore> corfuStore = Optional.empty();
private Optional<DistributedCheckpointerHelper> distributedCheckpointerHelper = Optional.empty();
private TrimLog trimLog;
private DistributedCheckpointerHelper distributedCheckpointerHelper;
private final Logger log;
private static final Duration LIVENESS_TIMEOUT = Duration.ofMinutes(1);

Expand Down Expand Up @@ -81,7 +81,6 @@ public void start(Duration interval) {

getCompactorLeaderServices();
this.trimLog = new TrimLog(getCorfuRuntime(), getCorfuStore());
this.distributedCheckpointerHelper = new DistributedCheckpointerHelper(getCorfuStore());

orchestratorThread.scheduleWithFixedDelay(
() -> LambdaUtils.runSansThrow(this::runOrchestrator),
Expand Down Expand Up @@ -114,6 +113,17 @@ public CorfuStore getCorfuStore() {
return this.corfuStore.get();
}

private DistributedCheckpointerHelper getDistributedCheckpointerHelper() {
if (!distributedCheckpointerHelper.isPresent()) {
try {
distributedCheckpointerHelper = Optional.of(new DistributedCheckpointerHelper(getCorfuStore()));
} catch (Exception e) {
log.error("Failed to obtain a DistributedCheckpointerHelper. Will retry on next attempt. Exception: ", e);
}
}
return distributedCheckpointerHelper.get();
}

/**
* Invokes the CorfuStoreCompactor jvm based on the status of CompactionManager
* Additionally, If the current node is the leader,
Expand Down Expand Up @@ -142,7 +152,7 @@ private void runOrchestrator() {
try {
if (isLeader) {
if (managerStatus != null && managerStatus.getStatus() == StatusType.STARTED) {
if (distributedCheckpointerHelper.isCompactionDisabled()) {
if (getDistributedCheckpointerHelper().isCompactionDisabled()) {
log.info("Compaction has been disabled. Force finish compaction cycle as it already started");
getCompactorLeaderServices().finishCompactionCycle();
} else {
Expand Down

0 comments on commit ce53a8a

Please sign in to comment.