Skip to content

Commit

Permalink
Bug fix for local execution ignoring cancellation. (#104)
Browse files Browse the repository at this point in the history
* Tweaks to execution of ready steps to check for cancellation.

* Tweak to break monitor loop if study cancelled.
  • Loading branch information
FrankD412 committed May 22, 2018
1 parent 420fe48 commit ccc5236
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
5 changes: 3 additions & 2 deletions maestrowf/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ def monitor_study(dag, pickle_path, cancel_lock_path, sleep_time):
dag.pickle(pickle_path)
# Write out the state
dag.write_status(os.path.split(pickle_path)[0])
# Sleep for SLEEPTIME in args
sleep(sleep_time)
# Sleep for SLEEPTIME in args if study not complete.
if not study_complete:
sleep(sleep_time)


def main():
Expand Down
27 changes: 26 additions & 1 deletion maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ def __init__(self, submission_attempts=1, submission_throttle=0,
self.completed_steps = set([SOURCE])
self.in_progress = set()
self.failed_steps = set()
self.cancelled_steps = set()
self.ready_steps = deque()
self.is_canceled = False

# Values for management of the DAG. Things like submission attempts,
# throttling, etc. should be listed here.
Expand Down Expand Up @@ -637,7 +639,8 @@ def execute_ready_steps(self):
adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"])
adapter = adapter(**self._adapter)

resolved_set = self.completed_steps | self.failed_steps
resolved_set = \
self.completed_steps | self.failed_steps | self.cancelled_steps
if not set(self.values.keys()) - resolved_set:
# Just return for now, but we'll need a way to signal that there
# are no more things to run.
Expand Down Expand Up @@ -784,9 +787,30 @@ def execute_ready_steps(self):
for i in range(0, _available):
# Pop the record and execute using the helper method.
_record = self.ready_steps.popleft()

# If we get to this point and we've cancelled, cancel the record.
if self.is_canceled:
logger.info("Cancelling '%s' -- continuing.", _record.name)
_record.mark_end(State.CANCELLED)
self.cancelled_steps.add(_record.name)
continue

logger.debug("Launching job %d -- %s", i, _record.name)
self._execute_record(_record, adapter)

# We cancelled, return True marking study as complete.
if self.is_canceled:
logger.info("Cancelled -- completing study.")
return True

resolved_set = \
self.completed_steps | self.failed_steps | self.cancelled_steps
if not set(self.values.keys()) - resolved_set:
# Just return for now, but we'll need a way to signal that there
# are no more things to run.
logging.info("'%s' is complete. Returning.", self.name)
return True

return False

def check_study_status(self):
Expand Down Expand Up @@ -839,6 +863,7 @@ def cancel_study(self):

# cancel our jobs
retcode = adapter.cancel_jobs(joblist)
self.is_canceled = True

if retcode == CancelCode.OK:
logger.info("Successfully requested to cancel all jobs.")
Expand Down

0 comments on commit ccc5236

Please sign in to comment.