Skip to content

Commit

Permalink
Modify compactor script to accommodate more memory options (#3558)
Browse files Browse the repository at this point in the history
The compactor script is modified to accommodate more memory options
that will be passed to it as a config file
  • Loading branch information
SravanthiAshokKumar committed Mar 21, 2023
1 parent 4a45744 commit 7f11d1c
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@

COMPACTOR_CONTROLS_CLASS_NAME = "org.corfudb.compactor.CompactorController"
COMPACTOR_CHECKPOINTER_CLASS_NAME = "org.corfudb.compactor.CompactorCheckpointer"
COMPACTOR_BULK_READ_SIZE = 50
COMPACTOR_BULK_READ_SIZE = 20
COMPACTOR_JVM_XMX = 1024
FORCE_DISABLE_CHECKPOINTING = "FORCE_DISABLE_CHECKPOINTING"
POD_NAME = "POD_NAME"
POD_NAMESPACE = "POD_NAMESPACE"
SMALL_XMX = 512
MEDIUM_XMX = 1000

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -62,13 +64,20 @@ class CommandBuilder(object):
def __init__(self, config):
self._config = config

def derive_xmx_value(self, diskBacked):
def derive_xmx_value(self, diskBacked, compactor_config):
try:
mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
mem_mb = int(mem_bytes / (1024. ** 2))
xmx = max(512, int(mem_mb * 0.04))
xmx = max(SMALL_XMX, int(mem_mb * 0.04))
if diskBacked is True:
xmx = max(self._config.xmx_min_in_mb, int(mem_mb * float(self._config.xmx_perc)/100))
if xmx > MEDIUM_XMX and "Large" in compactor_config["MemoryOptions"]:
xmx = compactor_config["MemoryOptions"]["Large"]
elif xmx > SMALL_XMX and xmx < MEDIUM_XMX and "Medium" in compactor_config["MemoryOptions"]:
xmx = compactor_config["MemoryOptions"]["Medium"]
elif xmx == SMALL_XMX and "Small" in compactor_config["MemoryOptions"]:
xmx = compactor_config["MemoryOptions"]["Small"]
else:
xmx = max(self._config.xmx_min_in_mb, int(mem_mb * float(self._config.xmx_perc)/100))
return xmx
except Exception as ex:
return COMPACTOR_JVM_XMX
Expand Down Expand Up @@ -155,9 +164,9 @@ def _resolve_ip_address(self, ifname):

def get_corfu_compactor_cmd(self, compactor_config, class_to_invoke):
diskBacked = False
if "DiskBacked" in compactor_config["ConfigFiles"] and compactor_config["ConfigFiles"]['DiskBacked'] is True:
if "DiskBacked" in compactor_config["MemoryOptions"] and compactor_config["MemoryOptions"]['DiskBacked'] is True:
diskBacked = True
xmx = self.derive_xmx_value(diskBacked)
xmx = self.derive_xmx_value(diskBacked, compactor_config)

cmd = []
cmd.append("MALLOC_TRIM_THRESHOLD_=1310720")
Expand Down Expand Up @@ -186,7 +195,7 @@ def get_corfu_compactor_cmd(self, compactor_config, class_to_invoke):
cmd.append("--truststore_password=" + Security["TruststorePassword"])

if diskBacked is True:
cmd.append("--persistedCacheRoot=" + compactor_config["ConfigFiles"]["DiskPath"])
cmd.append("--persistedCacheRoot=" + compactor_config["MemoryOptions"]["DiskPath"])

if not self._config.startCheckpointing:
if self._config.instantTriggerCompaction:
Expand Down

0 comments on commit 7f11d1c

Please sign in to comment.