Skip to content

Commit

Permalink
Add check for disable compaction key (#3453)
Browse files Browse the repository at this point in the history
If disable key is added while the compaction cycle is ongoing, the leader needs
to force complete the cycle and mark the status as COMPLETED/FAILED
  • Loading branch information
SravanthiAshokKumar committed Dec 19, 2022
1 parent 9efc8dc commit edda087
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ public void validateLiveness() {
if (statusToChange == LivenessValidator.Status.FINISH) {
log.info("Invoking finishCompactionCycle");
finishCompactionCycle();
livenessValidator.clearLivenessMap();
livenessValidator.clearLivenessValidator();
}
}

Expand Down Expand Up @@ -247,6 +245,8 @@ public void finishCompactionCycle() {
tableNames.size(), finalStatus);
MicroMeterUtils.time(Duration.ofMillis(totalTimeElapsed), "compaction.total.timer",
"nodeEndpoint", nodeEndpoint);
livenessValidator.clearLivenessMap();
livenessValidator.clearLivenessValidator();
} catch (RuntimeException re) {
//Do not retry here, the compactor service will trigger this method again
// The txn should succeed otherwise the status is FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.corfudb.runtime.CorfuCompactorManagement.CheckpointingStatus;
import org.corfudb.runtime.CorfuCompactorManagement.CheckpointingStatus.StatusType;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.DistributedCheckpointerHelper;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.view.Layout;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class CompactorService implements ManagementService {
private Optional<CompactorLeaderServices> compactorLeaderServices = Optional.empty();
private Optional<CorfuStore> corfuStore = Optional.empty();
private TrimLog trimLog;
private DistributedCheckpointerHelper distributedCheckpointerHelper;
private final Logger log;
private static final Duration LIVENESS_TIMEOUT = Duration.ofMinutes(1);

Expand Down Expand Up @@ -79,6 +81,7 @@ public void start(Duration interval) {

getCompactorLeaderServices();
this.trimLog = new TrimLog(getCorfuRuntime(), getCorfuStore());
this.distributedCheckpointerHelper = new DistributedCheckpointerHelper(getCorfuStore());

orchestratorThread.scheduleWithFixedDelay(
() -> LambdaUtils.runSansThrow(this::runOrchestrator),
Expand Down Expand Up @@ -137,25 +140,29 @@ private void runOrchestrator() {
log.warn("Unable to acquire manager status: ", e);
}
try {
if (managerStatus != null) {
if (managerStatus.getStatus() == StatusType.FAILED || managerStatus.getStatus() == StatusType.COMPLETED) {
checkpointerJvmManager.shutdown();
} else if (managerStatus.getStatus() == StatusType.STARTED && !checkpointerJvmManager.isRunning()
&& !checkpointerJvmManager.isInvoked()) {
checkpointerJvmManager.invokeCheckpointing();
}
}

if (isLeader) {
if (managerStatus != null && managerStatus.getStatus() == StatusType.STARTED) {
getCompactorLeaderServices().validateLiveness();
if (distributedCheckpointerHelper.isCompactionDisabled()) {
log.info("Compaction has been disabled. Force finish compaction cycle as it already started");
getCompactorLeaderServices().finishCompactionCycle();
} else {
getCompactorLeaderServices().validateLiveness();
}
} else if (compactionTriggerPolicy.shouldTrigger(
getCorfuRuntime().getParameters().getCheckpointTriggerFreqMillis(), getCorfuStore())) {
trimLog.invokePrefixTrim();
compactionTriggerPolicy.markCompactionCycleStart();
getCompactorLeaderServices().initCompactionCycle();
}
}
if (managerStatus != null) {
if (managerStatus.getStatus() == StatusType.FAILED || managerStatus.getStatus() == StatusType.COMPLETED) {
checkpointerJvmManager.shutdown();
} else if (managerStatus.getStatus() == StatusType.STARTED && !checkpointerJvmManager.isRunning()
&& !checkpointerJvmManager.isInvoked()) {
checkpointerJvmManager.invokeCheckpointing();
}
}
} catch (Exception ex) {
log.warn("Exception in runOrcestrator(): ", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private CorfuTable<CorfuDynamicKey, OpaqueCorfuDynamicRecord> openTable(TableNam
.setSerializer(serializer)
.addOpenOption(ObjectOpenOption.NO_CACHE);
if (this.checkpointerBuilder.persistedCacheRoot.isPresent()) {
log.info("Opening table in diskBacked mode");
String persistentCacheDirName = String.format("compactor_%s_%s",
tableName.getNamespace(), tableName.getTableName());
Path persistedCacheLocation = Paths.get(this.checkpointerBuilder.persistedCacheRoot.get()).resolve(persistentCacheDirName);
Expand Down
45 changes: 25 additions & 20 deletions scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
1. To trigger compactor instantly without trimming at the end of the cycle
/usr/share/corfu/scripts/compactor_runner.py --port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml --instantTriggerCompaction=True
2. To trigger compactor instantly with trimming at the end of the cycle
/usr/share/corfu/scripts/compactor_runner.py--port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml --instantTriggerCompaction=True --trimAfterCheckpoint=True
/usr/share/corfu/scripts/compactor_runner.py --port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml --instantTriggerCompaction=True --trimAfterCheckpoint=True
3. To freeze compactor (This stops running compactor for 2 hrs from the time of freezing)
/usr/share/corfu/scripts/compactor_runner.py--port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml--freezeCompaction=True
/usr/share/corfu/scripts/compactor_runner.py --port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml --freezeCompaction=True
4. To unfreeze compactor
/usr/share/corfu/scripts/compactor_runner.py--port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml--unfreezeCompaction=True
/usr/share/corfu/scripts/compactor_runner.py --port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml --unfreezeCompaction=True
5. To disable compactor (This stops running compactor until it is enabled again)
/usr/share/corfu/scripts/compactor_runner.py--port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml--disableCompaction=True
/usr/share/corfu/scripts/compactor_runner.py --port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml --disableCompaction=True
6. To enable compactor
/usr/share/corfu/scripts/compactor_runner.py--port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml--enableCompaction=True
/usr/share/corfu/scripts/compactor_runner.py --port <port> --compactorConfig /usr/share/corfu/conf/corfu-compactor-config.yml --enableCompaction=True
"""

from __future__ import absolute_import, print_function
Expand All @@ -32,6 +32,7 @@
CORFU_COMPACTOR_CLASS_NAME = "org.corfudb.compactor.CorfuStoreCompactorMain"
COMPACTOR_BULK_READ_SIZE = 50
COMPACTOR_JVM_XMX = 1024
FORCE_DISABLE_CHECKPOINTING = "FORCE_DISABLE_CHECKPOINTING"

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
Expand All @@ -44,7 +45,6 @@ def __init__(self):
"""
self.network_interface = None
self.corfu_port = None
self.diskPath = None
self.batchSize = None
self.configPath = None
self.startCheckpointing = None
Expand All @@ -61,11 +61,13 @@ class CommandBuilder(object):
def __init__(self, config):
self._config = config

def derive_xmx_value(self):
def derive_xmx_value(self, diskBacked):
try:
mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
mem_mb = int(mem_bytes / (1024. ** 2))
xmx = max(512, int(mem_mb * 0.04))
if diskBacked is True:
xmx = max(self._config.xmx_min_in_mb, int(mem_mb * float(self._config.xmx_perc)/100))
return xmx
except Exception as ex:
return COMPACTOR_JVM_XMX
Expand Down Expand Up @@ -118,7 +120,10 @@ def _resolve_ip_address(self, ifname):
fcntl.ioctl(sock.fileno(), 0x8915, struct.pack("256s", bytes(ifname[:15], 'utf-8')))[20:24])

def get_corfu_compactor_cmd(self, compactor_config):
xmx = self.derive_xmx_value()
diskBacked = False
if "DiskBacked" in compactor_config["ConfigFiles"] and compactor_config["ConfigFiles"]['DiskBacked'] is True:
diskBacked = True
xmx = self.derive_xmx_value(diskBacked)

cmd = []
cmd.append("MALLOC_TRIM_THRESHOLD_=1310720")
Expand Down Expand Up @@ -147,7 +152,14 @@ def get_corfu_compactor_cmd(self, compactor_config):
cmd.append("--truststore=" + Security["Truststore"])
cmd.append("--truststore_password=" + Security["TruststorePassword"])

if self._config.startCheckpointing:
if diskBacked is True:
cmd.append("--persistedCacheRoot=" + compactor_config["ConfigFiles"]["DiskPath"])

# If the env var FORCE_DISABLE_CHECKPOINTING is set to True, do not run checkpointing.
# This is used by Upgrade dry-run tool to disable compaction on one node.
force_disable_checkpointing = (os.environ.get(FORCE_DISABLE_CHECKPOINTING, "False") == "True")

if not force_disable_checkpointing and self._config.startCheckpointing:
cmd.append("--startCheckpointing=true")
if self._config.instantTriggerCompaction:
cmd.append("--instantTriggerCompaction=true")
Expand Down Expand Up @@ -186,8 +198,8 @@ def run(self):
self._run_corfu_compactor()

def _print_and_log(self, msg):
print(msg)
logger.info(msg)
print(msg)

# Disk space: max 50MB. keep 1000 files.
# Mem space: max 50MB, most of time 50KB. CORFU_GC_MAX_INDEX 1000 files, each file is 50KB.
Expand Down Expand Up @@ -224,8 +236,8 @@ def _run_corfu_compactor(self):
compactor_config = yaml.load(config)
corfu_paths = compactor_config["CorfuPaths"]
logging.basicConfig(filename=corfu_paths["CompactorLogfile"],
format='%(asctime)s.%(msecs)03dZ %(levelname)5s Runner - %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S')
format='%(asctime)s.%(msecs)03dZ %(levelname)5s Runner - %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S')
# Copy mem jvm gc log files to disk
try:
self._rsync_log(corfu_paths["CorfuMemLogPrefix"], corfu_paths["CorfuDiskLogDir"])
Expand All @@ -236,7 +248,7 @@ def _run_corfu_compactor(self):
if self._config.upgrade:
self._print_and_log("Appliance upgrading, skipping grep")
else:
grep_running_tool = "ps aux | grep '/usr/share/corfu/scripts/compactor_runner.py\|corfu_compactor_upgrade_runner.py' | grep -v 'grep\|CorfuServer' | grep " + self._config.corfu_port
grep_running_tool = "ps aux | grep 'python3 /usr/share/corfu/scripts/compactor_runner.py\|corfu_compactor_upgrade_runner.py' | grep -v 'grep\|CorfuServer' | grep " + self._config.corfu_port

try:
grep_tool_result = check_output(grep_running_tool, shell=True).decode()
Expand Down Expand Up @@ -282,8 +294,6 @@ def _complete_config(self, args):
config.corfu_port = args.port
if args.hostname:
config.hostname = args.hostname
if args.diskPath:
config.diskPath = args.diskPath
config.configPath = args.compactorConfig
config.instantTriggerCompaction = args.instantTriggerCompaction
config.trim = args.trimAfterCheckpoint
Expand Down Expand Up @@ -314,11 +324,6 @@ def _complete_config(self, args):
"Default value is 9000.",
required=False,
default="9000")
arg_parser.add_argument("--diskPath", type=str,
help="Temporary path to materialize a table"
"Default value is /tmp",
required=False,
default="/tmp/diskonlycorfutable/")
arg_parser.add_argument("--batchSize", type=str,
help="Batch size for loadTable",
required=False)
Expand Down

0 comments on commit edda087

Please sign in to comment.