Skip to content

Commit

Permalink
Modify and split CompactorMain into logical classes (#3509)
Browse files Browse the repository at this point in the history
* Modify and split CompactorMain into logical classes

Split CorfuStoreCompactorMain into CorfuCompactorCheckpointer and CorfuCompactorControls
-The CorfuCompactorControls class holds all the controls related paramters and performs
required operation accordingly
-The CorfuCompactorCheckpointer starts checkpointing tables if there is an ongoing
compaction cycle

* Fix codacy issues and modify compactor_runner as required

* Simplify the compactor class names
  • Loading branch information
SravanthiAshokKumar committed Feb 3, 2023
1 parent a571d68 commit d86ad96
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,49 @@
import java.util.Map;
import java.util.Optional;

/**
* CorfuCompactorConfig class parses the parameters passed and builds the required CorfuRuntime
*/
@Getter
@Slf4j
public class CorfuStoreCompactorConfig {
public class CompactorBaseConfig {

// Reduce checkpoint batch size due to disk-based nature and for smaller compactor JVM size
public static final int DISK_BACKED_DEFAULT_CP_MAX_WRITE_SIZE = 1 << 20;
public static final int DEFAULT_CP_MAX_WRITE_SIZE = 25 << 20;
public static final int SYSTEM_DOWN_HANDLER_TRIGGER_LIMIT = 100; // Corfu default is 20
public static final int CORFU_LOG_CHECKPOINT_ERROR = 3;
public static final String USAGE = "Usage: compactor-runner";
public static final String OPTIONS = "Options:\n";

private final Runnable defaultSystemDownHandler = () -> {
throw new UnreachableClusterException("Cluster is unavailable");
};

private final Map<String, Object> opts;
private final CorfuRuntimeParameters params;
private final NodeLocator nodeLocator;
private final Optional<String> persistedCacheRoot;
private final boolean startCheckpointing;
private final boolean upgradeDescriptorTable;
private final boolean instantTriggerCompaction;
private final boolean trim;
private final boolean freezeCompaction;
private final boolean unfreezeCompaction;
private final boolean disableCompaction;
private final boolean enableCompaction;

public CorfuStoreCompactorConfig(String[] args) {
this.opts = parseOpts(args);
private Map<String, Object> opts;
private CorfuRuntimeParameters params;
private NodeLocator nodeLocator;
private Optional<String> persistedCacheRoot;

public CompactorBaseConfig(String[] args, String additionalUsageParams, String additionalOptionsParams) {
parse(args, additionalUsageParams, additionalOptionsParams);
buildRuntime();
}

public void parse(String[] args, String additionalUsageParams, String additionalOptionsParams) {
this.opts = new Docopt(cmdLineUsageBuilder(additionalUsageParams, additionalOptionsParams))
.withVersion(GitRepositoryState.getRepositoryState().describe)
.parse(args);
}

public void buildRuntime() {
String host = getOpt("--hostname").orElseThrow(() -> new IllegalStateException("Empty host"));
int port = getOpt("--port").map(Integer::parseInt)
.orElseThrow(() -> new IllegalStateException("Port not defined"));

this.nodeLocator = NodeLocator.builder().host(host).port(port).build();

persistedCacheRoot = getOpt("--persistedCacheRoot");
startCheckpointing = getOpt("--startCheckpointing").isPresent();
upgradeDescriptorTable = getOpt("--upgradeDescriptorTable").isPresent();
instantTriggerCompaction = getOpt("--instantTriggerCompaction").isPresent();
trim = getOpt("--trim").isPresent();
freezeCompaction = getOpt("--freezeCompaction").isPresent();
unfreezeCompaction = getOpt("--unfreezeCompaction").isPresent();
disableCompaction = getOpt("--disableCompaction").isPresent();
enableCompaction = getOpt("--enableCompaction").isPresent();

if (freezeCompaction && unfreezeCompaction) {
log.error("Both freeze and unfreeze compaction parameters cannot be passed together");
throw new IllegalArgumentException("Both freeze and unfreeze compaction parameters cannot be passed together");
}
if (disableCompaction && enableCompaction) {
log.error("Both enable and disable compaction parameters cannot be passed together");
throw new IllegalArgumentException("Both enable and disable compaction parameters cannot be passed together");
}

CorfuRuntimeParametersBuilder builder = CorfuRuntimeParameters.builder();

Expand Down Expand Up @@ -94,7 +83,6 @@ public CorfuStoreCompactorConfig(String[] args) {
maxWriteSize = DISK_BACKED_DEFAULT_CP_MAX_WRITE_SIZE;
}
}

builder.maxWriteSize(maxWriteSize);

getOpt("--bulkReadSize").ifPresent(bulkReadSizeStr -> {
Expand All @@ -111,39 +99,37 @@ public CorfuStoreCompactorConfig(String[] args) {
.build();
}

private Map<String, Object> parseOpts(String[] args) {
return new Docopt(CompactorCmdLineHelper.USAGE)
.withVersion(GitRepositoryState.getRepositoryState().describe)
.parse(args);
}

private Optional<String> getOpt(String param) {
Optional<String> getOpt(String param) {
if (opts.get(param) != null) {
return Optional.of(opts.get(param).toString());
} else {
return Optional.empty();
}
}

private String cmdLineUsageBuilder(String additionalUsageParams, String additionalOptionsParams) {
String usage = USAGE +
CompactorCmdLineHelper.USAGE_PARAMS +
additionalUsageParams +
"\n" +
OPTIONS +
CompactorCmdLineHelper.OPTIONS_PARAMS + "\n" +
additionalOptionsParams;
return usage;
}

public static class CompactorCmdLineHelper {
public static final String USAGE = "Usage: corfu-compactor --hostname=<host> " +
"--port=<port>" +
public static final String USAGE_PARAMS = " --hostname=<host> " +
"--port=<port> " +
"[--keystore=<keystore_file>] [--ks_password=<keystore_password>] " +
"[--truststore=<truststore_file>] [--truststore_password=<truststore_password>] " +
"[--persistedCacheRoot=<pathToTempDirForLargeTables>] " +
"[--maxWriteSize=<maxWriteSizeLimit>] " +
"[--bulkReadSize=<bulkReadSize>] " +
"[--trim=<trim>] " +
"[--startCheckpointing=<startCheckpointing>] " +
"[--upgradeDescriptorTable=<upgradeDescriptorTable>] " +
"[--instantTriggerCompaction=<instantTriggerCompaction>] " +
"[--freezeCompaction=<freezeCompaction>] " +
"[--unfreezeCompaction=<unfreezeCompaction>] " +
"[--disableCompaction=<disableCompaction>] " +
"[--enableCompaction=<enableCompaction>] " +
"[--tlsEnabled=<tls_enabled>]\n"
+ "Options:\n"
+ "--hostname=<hostname> Hostname\n"
"[--tlsEnabled=<tls_enabled>]";

public static final String OPTIONS_PARAMS =
"--hostname=<hostname> Hostname\n"
+ "--port=<port> Port\n"
+ "--keystore=<keystore_file> KeyStore File\n"
+ "--ks_password=<keystore_password> KeyStore Password\n"
Expand All @@ -152,14 +138,6 @@ public static class CompactorCmdLineHelper {
+ "--persistedCacheRoot=<pathToTempDirForLargeTables> Path to Temp Dir\n"
+ "--maxWriteSize=<maxWriteSize> Max write size smaller than 2GB\n"
+ "--bulkReadSize=<bulkReadSize> Read size for chain replication\n"
+ "--trim=<trim> Should trim be performed along with instantTrigger\n"
+ "--startCheckpointing=<startCheckpointing> Start checkpointing if compaction cycle has started\n"
+ "--upgradeDescriptorTable=<upgradeDescriptorTable> Repopulate descriptor table?\n"
+ "--instantTriggerCompaction=<instantTriggerCompaction> If compactor cycle needs to be triggered instantly\n"
+ "--freezeCompaction=<freezeCompaction> If compaction needs to be frozen\n"
+ "--unfreezeCompaction=<unfreezeCompaction> If compaction needs to be resumed\n"
+ "--disableCompaction=<disableCompaction> If compaction needs to be disabled\n"
+ "--enableCompaction=<enableCompaction> If compaction needs to be enabled\n"
+ "--tlsEnabled=<tls_enabled>";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.corfudb.compactor;

import lombok.extern.slf4j.Slf4j;

import org.corfudb.runtime.CheckpointerBuilder;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.DistributedCheckpointer;
import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.ServerTriggeredCheckpointer;
import org.corfudb.runtime.collections.CorfuStore;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* Invokes the startCheckpointing() method of DistributedCompactor to checkpoint tables that weren't
* checkpointed by other clients
*/
@Slf4j
public class CompactorCheckpointer {
private final DistributedCheckpointerHelper distributedCheckpointerHelper;
private final DistributedCheckpointer distributedCheckpointer;
public static final int RETRY_CHECKPOINTING = 5;
private static final int RETRY_CHECKPOINTING_SLEEP_SECOND = 10;

public CompactorCheckpointer(String[] args) throws Exception {
CompactorBaseConfig config = new CompactorBaseConfig(args, "", "");

Thread.currentThread().setName("Cmpt-chkpter-" + config.getNodeLocator().getPort());
CorfuRuntime cpRuntime = (CorfuRuntime.fromParameters(
config.getParams())).parseConfigurationString(config.getNodeLocator().toEndpointUrl()).connect();
CorfuRuntime corfuRuntime = (CorfuRuntime.fromParameters(
config.getParams())).parseConfigurationString(config.getNodeLocator().toEndpointUrl()).connect();
CorfuStore corfuStore = new CorfuStore(corfuRuntime);

this.distributedCheckpointerHelper = new DistributedCheckpointerHelper(corfuStore);
this.distributedCheckpointer = new ServerTriggeredCheckpointer(CheckpointerBuilder.builder()
.corfuRuntime(corfuRuntime)
.cpRuntime(Optional.of(cpRuntime))
.persistedCacheRoot(config.getPersistedCacheRoot())
.isClient(false)
.build(), corfuStore, distributedCheckpointerHelper.getCompactorMetadataTables());
}

public CompactorCheckpointer(DistributedCheckpointerHelper distributedCheckpointerHelper,
DistributedCheckpointer distributedCheckpointer) {
this.distributedCheckpointerHelper = distributedCheckpointerHelper;
this.distributedCheckpointer = distributedCheckpointer;
}

/**
* Entry point to invoke checkpointing by the client jvm
*
* @param args command line argument strings
*/
public static void main(String[] args) {
try {
CompactorCheckpointer corfuCompactorMain = new CompactorCheckpointer(args);
corfuCompactorMain.startCheckpointing();
} catch (Exception e) {
log.error("CorfuStoreCompactorMain crashed with error: {}, Exception: ",
CompactorBaseConfig.CORFU_LOG_CHECKPOINT_ERROR, e);
}
log.info("Exiting CorfuStoreCompactor");
}

protected void startCheckpointing() {
try {
for (int i = 0; i < RETRY_CHECKPOINTING; i++) {
if (distributedCheckpointerHelper.hasCompactionStarted()) {
distributedCheckpointer.checkpointTables();
break;
}
log.info("Compaction cycle hasn't started yet...");
TimeUnit.SECONDS.sleep(RETRY_CHECKPOINTING_SLEEP_SECOND);
}
} catch (InterruptedException ie) {
log.error("Sleep interrupted with exception: ", ie);
} catch (Exception e) {
log.error("Exception during checkpointing: {}, StackTrace: {}", e.getMessage(), e.getStackTrace());
} finally {
distributedCheckpointer.shutdown();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.corfudb.compactor;

import lombok.extern.slf4j.Slf4j;

import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.collections.CorfuStore;

/**
* Helps with inserting the required compactor controls key into the CompactionControlsTable
* These keys are used by the manager to make decisions regarding triggering the next cycle
*/
@Slf4j
public class CompactorController {
private final CorfuRuntime corfuRuntime;
private final CorfuStore corfuStore;
private final CompactorControllerConfig config;
private final DistributedCheckpointerHelper distributedCheckpointerHelper;
public CompactorController(String[] args) throws Exception {
this.config = new CompactorControllerConfig(args);

Thread.currentThread().setName("Cmpt-ctrls-" + config.getNodeLocator().getPort());
this.corfuRuntime = (CorfuRuntime.fromParameters(
config.getParams())).parseConfigurationString(config.getNodeLocator().toEndpointUrl()).connect();
this.corfuStore = new CorfuStore(corfuRuntime);

this.distributedCheckpointerHelper = new DistributedCheckpointerHelper(corfuStore);
}

public CompactorController(CompactorControllerConfig config, CorfuRuntime corfuRuntime, CorfuStore corfuStore,
DistributedCheckpointerHelper distributedCheckpointerHelper) {
this.config = config;
this.corfuRuntime = corfuRuntime;
this.corfuStore = corfuStore;
this.distributedCheckpointerHelper = distributedCheckpointerHelper;
}

/**
* Entry point to invoke compactor controls operations
*
* @param args command line argument strings
*/
public static void main(String[] args) {
try {
CompactorController corfuStoreCompactorControls = new CompactorController(args);
corfuStoreCompactorControls.doCompactorAction();
} catch (Exception e) {
log.error("CorfuStoreCompactorMain crashed with error: {}, Exception: ",
CompactorBaseConfig.CORFU_LOG_CHECKPOINT_ERROR, e);
}
log.info("Exiting CorfuStoreCompactor");
}

protected void doCompactorAction() {
if (config.isFreezeCompaction() || config.isDisableCompaction()) {
if (config.isDisableCompaction()) {
log.info("Disabling compaction...");
distributedCheckpointerHelper.disableCompaction();
}
if (config.isFreezeCompaction()) {
log.info("Freezing compaction...");
distributedCheckpointerHelper.freezeCompaction();
}
return;
}
if (config.isUnfreezeCompaction()) {
log.info("Unfreezing compaction...");
distributedCheckpointerHelper.unfreezeCompaction();
}
if (config.isEnableCompaction()) {
log.info("Enabling compaction...");
distributedCheckpointerHelper.enableCompaction();
}
if (config.isInstantTriggerCompaction()) {
if (config.isTrim()) {
log.info("Enabling instant compaction trigger with trim...");
distributedCheckpointerHelper.instantTrigger(true);

} else {
log.info("Enabling instant compactor trigger...");
distributedCheckpointerHelper.instantTrigger(false);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.corfudb.compactor;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* CorfuCompactorControlsConfig class parses the compactor controls specific parameters passed if any
*/
@Getter
@Slf4j
public class CompactorControllerConfig extends CompactorBaseConfig {
private final boolean upgradeDescriptorTable;
private final boolean instantTriggerCompaction;
private final boolean trim;
private final boolean freezeCompaction;
private final boolean unfreezeCompaction;
private final boolean disableCompaction;
private final boolean enableCompaction;

public CompactorControllerConfig(String[] args) {
super(args, CompactorControlsCmdLineHelper.USAGE_PARAMS, CompactorControlsCmdLineHelper.OPTIONS_PARAMS);

upgradeDescriptorTable = getOpt("--upgradeDescriptorTable").isPresent();
instantTriggerCompaction = getOpt("--instantTriggerCompaction").isPresent();
trim = getOpt("--trim").isPresent();
freezeCompaction = getOpt("--freezeCompaction").isPresent();
unfreezeCompaction = getOpt("--unfreezeCompaction").isPresent();
disableCompaction = getOpt("--disableCompaction").isPresent();
enableCompaction = getOpt("--enableCompaction").isPresent();

if (freezeCompaction && unfreezeCompaction) {
log.error("Both freeze and unfreeze compaction parameters cannot be passed together");
throw new IllegalArgumentException("Both freeze and unfreeze compaction parameters cannot be passed together");
}
if (disableCompaction && enableCompaction) {
log.error("Both enable and disable compaction parameters cannot be passed together");
throw new IllegalArgumentException("Both enable and disable compaction parameters cannot be passed together");
}
}

public static class CompactorControlsCmdLineHelper {
public static final String USAGE_PARAMS = " [--trim=<trim>] " +
"[--upgradeDescriptorTable=<upgradeDescriptorTable>] " +
"[--instantTriggerCompaction=<instantTriggerCompaction>] " +
"[--freezeCompaction=<freezeCompaction>] " +
"[--unfreezeCompaction=<unfreezeCompaction>] " +
"[--disableCompaction=<disableCompaction>] " +
"[--enableCompaction=<enableCompaction>]";
public static final String OPTIONS_PARAMS = "--trim=<trim> Should trim be performed along with instantTrigger\n"
+ "--upgradeDescriptorTable=<upgradeDescriptorTable> Repopulate descriptor table?\n"
+ "--instantTriggerCompaction=<instantTriggerCompaction> If compactor cycle needs to be triggered instantly\n"
+ "--freezeCompaction=<freezeCompaction> If compaction needs to be frozen\n"
+ "--unfreezeCompaction=<unfreezeCompaction> If compaction needs to be resumed\n"
+ "--disableCompaction=<disableCompaction> If compaction needs to be disabled\n"
+ "--enableCompaction=<enableCompaction> If compaction needs to be enabled";
}
}

0 comments on commit d86ad96

Please sign in to comment.