Skip to content

Commit

Permalink
Merge pull request #728 from muhrin/fix_722_failed_wf
Browse files Browse the repository at this point in the history
Fixes #722 issue affecting failed workflows
  • Loading branch information
sphuber committed Sep 27, 2017
2 parents c73a010 + bc91c96 commit 1aa2bd3
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions aiida/work/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ def __init__(self, running_directory=_RUNNING_DIRECTORY,
Process pickles in. If None they will be deleted on fail.
:type failed_directory: str
"""
self._running_directory = running_directory
self._finished_directory = finished_directory
self._failed_directory = failed_directory
super(Persistence, self).__init__(
running_directory=running_directory,
finished_directory=finished_directory,
failed_directory=failed_directory
)
self._filelocks = {}

@property
Expand Down Expand Up @@ -168,7 +170,7 @@ def load_all_processes(self):
continue
except BaseException:
LOGGER.warning("Failed to load checkpoint '{}' (deleting)\n{}"
.format(f, traceback.format_exc()))
.format(f, traceback.format_exc()))

try:
os.remove(f)
Expand Down Expand Up @@ -298,22 +300,22 @@ def on_process_finish(self, process):
def on_process_destroy(self, process):
self.unpersist_process(process)


############################################################################

# ProcessMonitorListener messages ##########################################
@override
def on_monitored_process_failed(self, pid):
try:
self._release_process(process.pid, self.failed_directory)
self._release_process(pid, self.failed_directory)
except ValueError:
pass

############################################################################

@override
def on_monitored_process_created(self, process):
self.persist_process(process)
if self._auto_persist and process.pid not in self._filelocks:
self.persist_process(process)

@staticmethod
def _ensure_directory(dir_path):
Expand All @@ -332,7 +334,10 @@ def _release_process(self, pid, save_dir=None):
"""
# Get the current location of the pickle
pickle_path = self.get_running_path(pid)
lock = self._filelocks.pop(pid)
try:
lock = self._filelocks.pop(pid)
except KeyError:
raise ValueError("Unknown process with pid '{}'".format(pid))

try:
if path.isfile(pickle_path):
Expand Down Expand Up @@ -467,4 +472,4 @@ def _create_storage():
running_directory=os.path.join(WORKFLOWS_DIR, 'running'),
finished_directory=os.path.join(WORKFLOWS_DIR, 'finished'),
failed_directory=os.path.join(WORKFLOWS_DIR, 'failed')
)
)

0 comments on commit 1aa2bd3

Please sign in to comment.