Skip to content

Commit

Permalink
Modify compactor script (#3490)
Browse files Browse the repository at this point in the history
* Modify compactor script

Only server can trigger the script with startCheckpointing param.
Hence modify grep accordingly. Also remove upgrade param.

* Add compactor_utils to be used for compactor verification
  • Loading branch information
SravanthiAshokKumar committed Jan 31, 2023
1 parent 71cb842 commit b8e5d20
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 22 deletions.
40 changes: 18 additions & 22 deletions scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def __init__(self):
Fields are public to keep it simple.
"""
self.network_interface = None
self.hostname = None
self.corfu_port = None
self.batchSize = None
self.configPath = None
self.startCheckpointing = None
self.instantTriggerCompaction = None
Expand Down Expand Up @@ -177,7 +177,7 @@ def get_corfu_compactor_cmd(self, compactor_config):
return " ".join(cmd)


class Wizard(object):
class CompactorRunner(object):
def __init__(self, args):
"""
Initialize components and read user provided configuration.
Expand Down Expand Up @@ -234,7 +234,7 @@ def _run_corfu_compactor(self):
Note: need to ensure the 15min gap between two runs of this tool to avoid trim exception.
"""
with open(self._config.configPath, "r") as config:
compactor_config = yaml.load(config)
compactor_config = yaml.load(config, yaml.FullLoader)
corfu_paths = compactor_config["CorfuPaths"]
logging.basicConfig(filename=corfu_paths["CompactorLogfile"],
format='%(asctime)s.%(msecs)03dZ %(levelname)5s Runner - %(message)s',
Expand All @@ -245,11 +245,8 @@ def _run_corfu_compactor(self):
except Exception as ex:
self._print_and_log("Failed to run rsync_log " + " error: " + str(ex))

# Grep the compactor tool process, exclude grep itself.
if self._config.upgrade:
self._print_and_log("Appliance upgrading, skipping grep")
else:
grep_running_tool = "ps aux | grep 'python3 /usr/share/corfu/scripts/compactor_runner.py\|corfu_compactor_upgrade_runner.py' | grep -v 'grep\|CorfuServer' | grep " + self._config.corfu_port
if self._config.startCheckpointing:
grep_running_tool = "ps aux | grep 'python3 /usr/share/corfu/scripts/compactor_runner.py' | grep 'startCheckpointing' | grep -v 'grep' | grep " + self._config.corfu_port

try:
grep_tool_result = check_output(grep_running_tool, shell=True).decode()
Expand Down Expand Up @@ -293,20 +290,23 @@ def _complete_config(self, args):
config = Config()
config.network_interface = args.ifname
config.corfu_port = args.port
if args.hostname:
config.hostname = args.hostname
config.configPath = args.compactorConfig
config.instantTriggerCompaction = args.instantTriggerCompaction
config.trim = args.trimAfterCheckpoint
config.upgrade = args.upgrade
if not config.upgrade:
if 'hostname' in args and args.hostname:
config.hostname = args.hostname
if 'instantTriggerCompaction' in args:
config.instantTriggerCompaction = args.instantTriggerCompaction
if 'trimAfterCheckpoint' in args:
config.trim = args.trimAfterCheckpoint
if 'freezeCompaction' in args:
config.freezeCompaction = args.freezeCompaction
if 'unfreezeCompaction' in args:
config.unfreezeCompaction = args.unfreezeCompaction
if 'disableCompaction' in args:
config.disableCompaction = args.disableCompaction
if 'enableCompaction' in args:
config.enableCompaction = args.enableCompaction
if 'startCheckpointing' in args:
config.startCheckpointing = args.startCheckpointing
else:
config.startCheckpointing = True
return config

if __name__ == "__main__":
Expand All @@ -325,17 +325,13 @@ def _complete_config(self, args):
"Default value is 9000.",
required=False,
default="9000")
arg_parser.add_argument("--batchSize", type=str,
help="Batch size for loadTable",
required=False)
arg_parser.add_argument("--compactorConfig", type=str,
help="The file containing config for compactor",
default="/usr/share/corfu/conf/corfu-compactor-config.yml",
required=False)
arg_parser.add_argument("--startCheckpointing", type=bool, default=False,
help="Start checkpointing tables if compaction cycle has started",
required=False)
arg_parser.add_argument("--upgrade", type=bool, required=False)
arg_parser.add_argument("--instantTriggerCompaction", type=bool,
help="To instantly trigger compaction cycle",
required=False)
Expand All @@ -355,6 +351,6 @@ def _complete_config(self, args):
help="To enable trim again after checkpointing all tables",
required=False)
args = arg_parser.parse_args()
wizard = Wizard(args)
wizard.run()
compactor_runner = CompactorRunner(args)
compactor_runner.run()

58 changes: 58 additions & 0 deletions scripts/compactor_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import subprocess
import time
import yaml

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

NUM_RETRIES = 120

def set_logger(config_file_path):
with open(config_file_path, "r") as config:
compactor_config = yaml.load(config, yaml.FullLoader)
corfu_paths = compactor_config["CorfuPaths"]
logging.basicConfig(filename=corfu_paths["CompactorLogfile"],
format='%(asctime)s.%(msecs)03dZ %(levelname)5s Runner - %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S', force = True)

def log_info(msg):
logger.info(msg)

def wait_for_compactor_end(port, target_cycle_count):
for _ in range(NUM_RETRIES):
status, cycle_count = get_compactor_status(port)
if cycle_count >= target_cycle_count and status != "STARTED":
log_info("status: " + status + " cycleCount: " + str(cycle_count))
return
else:
time.sleep(5)

def get_compactor_status(port):
cmd = "/opt/vmware/bin/corfu_tool_runner.py -t CompactionManagerTable -n CorfuSystem -o showTable --port " + port
result = subprocess.check_output(cmd, shell=True).decode()
status = "IDLE"
cycle_count = 0
for line in result.split("\n"):
if "status\"" in line:
status = line.split(":")[-1].split("\"")[1]
if "cycleCount\"" in line:
cycle_count = line.split(":")[-1].split("\"")[1]
break
return status, int(cycle_count)

def wait_and_verify_compactor_success(port, target_cycle_count):
"""
This method waits for a compactor cycle as specified by the target_cycle_count to end
:param port: port to which the compactor client should connect to the corfu server
:param target_cycle_count: the number that denotes the compactor cycle
:return:
True if the compactor cycle completed successfully, else returns False
"""
wait_for_compactor_end(port, target_cycle_count)
status, cycle_count = get_compactor_status(port)
if status == "COMPLETED":
log_info("Compaction status for corfu at port: " + str(port) + " is COMPLETED")
return True
log_info("Compaction status for corfu at port: " + str(port) + " is " + status)
return False

0 comments on commit b8e5d20

Please sign in to comment.