Skip to content

Commit

Permalink
Addition of a simple dry-run capability. (#259)
Browse files Browse the repository at this point in the history
* Addition of a simple dry-run capability.

* Addition of a DRYRUN state.

* Tweak to reduce sleep time for dry run.

* Renamed dryrun to dry to reduce redundancy.

* Enable autoyes when dry running is invoked.
  • Loading branch information
Francesco Di Natale committed May 13, 2020
1 parent ed4311a commit 0ae906f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 19 deletions.
1 change: 1 addition & 0 deletions maestrowf/abstracts/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class State(Enum):
TIMEDOUT = 10
UNKNOWN = 11
CANCELLED = 12
DRYRUN = 13


class StudyStatus(Enum):
Expand Down
29 changes: 24 additions & 5 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class ExecutionGraph(DAG, PickleInterface):
"""

def __init__(self, submission_attempts=1, submission_throttle=0,
use_tmp=False):
use_tmp=False, dry_run=False):
"""
Initialize a new instance of an ExecutionGraph.
Expand Down Expand Up @@ -336,6 +336,7 @@ def __init__(self, submission_attempts=1, submission_throttle=0,
# throttling, etc. should be listed here.
self._submission_attempts = submission_attempts
self._submission_throttle = submission_throttle
self.dry_run = dry_run

# A map that tracks the dependencies of a step.
# NOTE: I don't know how performant the Python dict structure is, but
Expand Down Expand Up @@ -536,6 +537,20 @@ def _execute_record(self, record, adapter, restart=False):
# 1. If the JobStatus is not OK.
# 2. num_restarts is less than self._submission_attempts
self._check_tmp_dir()

# Only set up the workspace the initial iteration.
if not restart:
LOGGER.debug("Setting up workspace for '%s' at %s",
record.name, str(datetime.now()))
# Generate the script for execution on the fly.
record.setup_workspace() # Generate the workspace.
record.generate_script(adapter, self._tmp_dir)

if self.dry_run:
record.mark_end(State.DRYRUN)
self.completed_steps.add(record.name)
return

while retcode != SubmissionCode.OK and \
num_restarts < self._submission_attempts:
LOGGER.info("Attempting submission of '%s' (attempt %d of %d)...",
Expand All @@ -546,9 +561,6 @@ def _execute_record(self, record, adapter, restart=False):
if not restart:
LOGGER.debug("Calling 'execute' on '%s' at %s",
record.name, str(datetime.now()))
# Generate the script for execution on the fly.
record.setup_workspace() # Generate the workspace.
record.generate_script(adapter, self._tmp_dir)
retcode = record.execute(adapter)
# Otherwise, it's a restart.
else:
Expand Down Expand Up @@ -659,7 +671,14 @@ def execute_ready_steps(self):
adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
adapter = adapter(**self._adapter)

retcode, job_status = self.check_study_status()
if not self.dry_run:
LOGGER.debug("Checking status check...")
retcode, job_status = self.check_study_status()
else:
LOGGER.debug("DRYRUN: Skipping status check...")
retcode = JobStatusCode.OK
job_status = {}

LOGGER.debug("Checked status (retcode %s)-- %s", retcode, job_status)

# For now, if we can't check the status something is wrong.
Expand Down
19 changes: 14 additions & 5 deletions maestrowf/datastructures/core/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ def __init__(self, name, description,
LOGGER.debug("OUTPUT_PATH = %s", out_path)
# Flag the study as not having been set up and add the source node.
self._issetup = False
self.is_configured = False
self.add_node(SOURCE, None)

# Settings for handling restarts and submission attempts.
self._restart_limit = 0
self._submission_attempts = 0
self._use_tmp = False
self._dry_run = False

# Management structures
# The workspace used by each step.
Expand Down Expand Up @@ -390,7 +392,8 @@ def setup_environment(self):
self.environment.acquire_environment()

def configure_study(self, submission_attempts=1, restart_limit=1,
throttle=0, use_tmp=False, hash_ws=False):
throttle=0, use_tmp=False, hash_ws=False,
dry_run=False):
"""
Perform initial configuration of a study. \
Expand All @@ -405,6 +408,8 @@ def configure_study(self, submission_attempts=1, restart_limit=1,
denotes no cap].\
:param use_tmp: Boolean value specifying if the generated \
ExecutionGraph dumps its information into a temporary directory. \
:param dry_run: Boolean value that toggles dry run to just generate \
study workspaces and scripts without execution or status checking. \
:returns: True if the Study is successfully setup, False otherwise. \
"""

Expand All @@ -413,20 +418,24 @@ def configure_study(self, submission_attempts=1, restart_limit=1,
self._submission_throttle = throttle
self._use_tmp = use_tmp
self._hash_ws = hash_ws
self._dry_run = dry_run

LOGGER.info(
"\n------------------------------------------\n"
"Output path = %s\n"
"Submission attempts = %d\n"
"Submission restart limit = %d\n"
"Submission throttle limit = %d\n"
"Use temporary directory = %s\n"
"Hash workspaces = %s\n"
"Dry run enabled = %s\n"
"Output path = %s\n"
"------------------------------------------",
self._out_path, submission_attempts, restart_limit, throttle,
use_tmp, hash_ws
submission_attempts, restart_limit, throttle,
use_tmp, hash_ws, dry_run, self._out_path
)

self.is_configured = True

def _stage(self, dag):
"""
Set up the ExecutionGraph of a parameterized study.
Expand Down Expand Up @@ -828,7 +837,7 @@ def stage(self):
dag = ExecutionGraph(
submission_attempts=self._submission_attempts,
submission_throttle=self._submission_throttle,
use_tmp=self._use_tmp)
use_tmp=self._use_tmp, dry_run=self._dry_run)
dag.add_description(**self.description)
dag.log_description()

Expand Down
24 changes: 15 additions & 9 deletions maestrowf/maestro.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,18 @@ def run_study(args):

# Set up the study workspace and configure it for execution.
study.setup_workspace()
study.setup_environment()
study.configure_study(
throttle=args.throttle, submission_attempts=args.attempts,
restart_limit=args.rlimit, use_tmp=args.usetmp, hash_ws=args.hashws)
restart_limit=args.rlimit, use_tmp=args.usetmp, hash_ws=args.hashws,
dry_run=args.dry)
study.setup_environment()

if args.dry:
# If performing a dry run, drive sleep time down to generate scripts.
sleeptime = 1
else:
# else, use args to decide sleeptime
sleeptime = args.sleeptime

batch = {"type": "local"}
if spec.batch:
Expand All @@ -245,15 +253,13 @@ def run_study(args):
batch["type"] = "local"
# Copy the spec to the output directory
shutil.copy(args.specification, study.output_path)
# Check for a dry run
if args.dryrun:
raise NotImplementedError("The 'dryrun' mode is in development.")

# Use the Conductor's classmethod to store the study.
Conductor.store_study(study)
Conductor.store_batch(study.output_path, batch)

# If we are automatically launching, just set the input as yes.
if args.autoyes:
if args.autoyes or args.dry:
uinput = "y"
elif args.autono:
uinput = "n"
Expand All @@ -265,7 +271,7 @@ def run_study(args):
# Launch in the foreground.
LOGGER.info("Running Maestro Conductor in the foreground.")
conductor = Conductor(study)
conductor.initialize(batch, args.sleeptime)
conductor.initialize(batch, sleeptime)
completion_status = conductor.monitor_study()
conductor.cleanup()
return completion_status.value
Expand All @@ -276,7 +282,7 @@ def run_study(args):
*["{}.txt".format(study.name)])

cmd = ["nohup", "conductor",
"-t", str(args.sleeptime),
"-t", str(sleeptime),
"-d", str(args.debug_lvl),
study.output_path,
">", log_path, "2>&1"]
Expand Down Expand Up @@ -325,7 +331,7 @@ def setup_argparser():
run.add_argument("-s", "--sleeptime", type=int, default=60,
help="Amount of time (in seconds) for the manager to "
"wait between job status checks. [Default: %(default)d]")
run.add_argument("-d", "--dryrun", action="store_true", default=False,
run.add_argument("--dry", action="store_true", default=False,
help="Generate the directory structure and scripts for a "
"study but do not launch it. [Default: %(default)s]")
run.add_argument("-p", "--pgen", type=str,
Expand Down

0 comments on commit 0ae906f

Please sign in to comment.