Skip to content

Commit

Permalink
Merge 0786ae1 into e6d5402
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Apr 12, 2020
2 parents e6d5402 + 0786ae1 commit ab4fa19
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 44 deletions.
15 changes: 12 additions & 3 deletions WDL/runtime/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,34 @@ class CommandFailed(_RuntimeError):
"""

def __init__(self, exit_status: int, stderr_file: str, message: str = "") -> None:
super().__init__(message or f"task command failed with exit status {exit_status}")
oom_hint = ", a possible indication that it ran out of memory" if exit_status == 137 else ""
super().__init__(message or f"task command failed with exit status {exit_status}{oom_hint}")
self.exit_status = exit_status
self.stderr_file = stderr_file


class Terminated(_RuntimeError):
"""
Workflow/task was terminated, e.g. by Unix signal
Workflow/task was intentionally terminated, e.g. by Unix signal
"""

quiet: bool
"""
Termination was a secondary side-effect, so warrants less logging
Termination warrants less logging because it was a secondary side-effect of a previous error
"""

def __init__(self, quiet: bool = False) -> None:
self.quiet = quiet


class Interrupted(_RuntimeError):
"""
Task was interrupted by an exogenous problem (e.g. worker node went down)
"""

pass


class OutputError(_RuntimeError):
"""
Failure whilst gathering task outputs
Expand Down
114 changes: 74 additions & 40 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ def _run(
# run command in container & return exit status
raise NotImplementedError()

def reset(self, logger: logging.Logger, prev_retries: int, delete_work: bool = False) -> None:
def reset(self, logger: logging.Logger, retries: int, delete_work: bool = False) -> None:
"""
After a container/command failure, reset the working directory state so that
copy_input_files() and run() can be retried.
"""
artifacts_dir = os.path.join(self.host_dir, "failed_tries", str(prev_retries))
artifacts_dir = os.path.join(self.host_dir, "failed_tries", str(retries))
artifacts = []
for artifact in ["work", "command", "stdout.txt", "stderr.txt", "stderr.txt.offset"]:
src = os.path.join(self.host_dir, artifact)
Expand Down Expand Up @@ -611,33 +611,42 @@ def poll_service(
len(self._observed_states or []) <= 1
), "docker task shouldn't disappear from service"

# references on docker task states:
# https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/
# https://github.com/docker/swarmkit/blob/master/design/task_model.md
# https://github.com/moby/moby/blob/8fbf2598f58fb212230e6ddbcfbde628b0458250/api/types/swarm/task.go#L12

# log each new state
assert isinstance(self._observed_states, set)
if status["State"] not in self._observed_states:
state = status["State"]
assert isinstance(state, str) and isinstance(self._observed_states, set)
if state not in self._observed_states:
loginfo = {"service": svc.short_id}
if tasks:
loginfo["task"] = tasks[0]["ID"][:10]
if "NodeID" in tasks[0]:
loginfo["node"] = tasks[0]["NodeID"][:10]
method = logger.notice if status["State"] == "running" else logger.info # pyre-fixme
method(_(f"docker task {status['State']}", **loginfo))
self._observed_states.add(status["State"])
if status.get("Err", None):
loginfo["Err"] = status["Err"]
method = logger.notice if state == "running" else logger.info # pyre-fixme
method(_(f"docker task {state}", **loginfo))
self._observed_states.add(state)

# https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/
# https://github.com/moby/moby/blob/8fbf2598f58fb212230e6ddbcfbde628b0458250/api/types/swarm/task.go#L12
# determine whether docker task has exited
exit_code = None
if "ExitCode" in status.get("ContainerStatus", {}):
exit_code = status["ContainerStatus"]["ExitCode"] # pyre-fixme
assert isinstance(exit_code, int)
if exit_code != 0 or status["State"] == "complete":
logger.notice( # pyre-fixme
_("docker task exit", state=status["State"], exit_code=exit_code)
)
return exit_code

if status["State"] in ["failed", "rejected", "orphaned", "remove"]:
raise RuntimeError(
f"docker task {status['State']}"
+ ((": " + status["Err"]) if "Err" in status else "")
if state in ["complete", "failed"]:
logger.notice(_("docker task exit", state=state, exit_code=exit_code)) # pyre-fixme
assert isinstance(exit_code, int) and (exit_code == 0) == (state == "complete")
return exit_code
elif state in ["rejected", "shutdown", "orphaned", "remove"] or exit_code not in [None, 0]:
# note: worker shutdown seems to manifest as state=running, exit_code=-1
raise (RuntimeError if state == "rejected" else Interrupted)( # pyre-ignore
f"docker task {state}"
+ (f", exit code = {exit_code}" if exit_code not in [None, 0] else "")
+ (f": {status['Err']}" if "Err" in status else "")
)

return None
Expand Down Expand Up @@ -1053,6 +1062,8 @@ def _eval_task_runtime(

if "maxRetries" in runtime_values:
ans["maxRetries"] = max(0, runtime_values["maxRetries"].coerce(Type.Int()).value)
if "preemptible" in runtime_values:
ans["preemptible"] = max(0, runtime_values["preemptible"].coerce(Type.Int()).value)

if ans:
logger.info(_("effective runtime", **ans))
Expand All @@ -1071,10 +1082,13 @@ def _try_task(
runtime: Dict[str, Union[int, str]],
) -> None:
"""
Run the task command in the container, with up to runtime.maxRetries
Run the task command in the container, retrying up to runtime.preemptible occurrences of
Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
"""
maxRetries = runtime.get("maxRetries", 0)
prevRetries = 0
max_retries = runtime.get("maxRetries", 0)
max_interruptions = runtime.get("preemptible", 0)
retries = 0
interruptions = 0

while True:
# copy input files, if needed
Expand All @@ -1083,33 +1097,53 @@ def _try_task(

try:
# start container & run command
return container.run(
logger,
command,
int(runtime.get("cpu", 0)),
int(runtime.get("memory_reservation", 0)),
int(runtime.get("memory_limit", 0)),
)
try:
return container.run(
logger,
command,
int(runtime.get("cpu", 0)),
int(runtime.get("memory_reservation", 0)),
int(runtime.get("memory_limit", 0)),
)
finally:
if (
"preemptible" in runtime
and cfg.has_option("task_runtime", "_mock_interruptions")
and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
):
raise Interrupted("mock interruption") from None
except Exception as exn:
if isinstance(exn, Terminated) or prevRetries >= maxRetries:
raise
logger.error(
_(
"task failure will be retried",
error=exn.__class__.__name__,
message=str(exn),
prevRetries=prevRetries,
maxRetries=maxRetries,
if isinstance(exn, Interrupted) and interruptions < max_interruptions:
logger.error(
_(
"interrupted task will be retried",
error=exn.__class__.__name__,
message=str(exn),
prev_interruptions=interruptions,
max_interruptions=max_interruptions,
)
)
)
interruptions += 1
elif not isinstance(exn, Terminated) and retries < max_retries:
logger.error(
_(
"failed task will be retried",
error=exn.__class__.__name__,
message=str(exn),
prev_retries=retries,
max_retries=max_retries,
)
)
retries += 1
else:
raise
container.reset(
logger,
prevRetries,
interruptions + retries - 1,
delete_work=(
cfg["task_runtime"]["delete_work"].strip().lower() in ["always", "failure"]
),
)
prevRetries += 1


def _eval_task_outputs(
Expand Down
3 changes: 2 additions & 1 deletion tests/test_6workflowrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,14 +1003,15 @@ def test_retry(self):
}
runtime {
maxRetries: 99
preemptible: 2
}
}
"""
outputs = self._test_workflow(txt)
self.assertGreaterEqual(outputs["finish_time"], outputs["start_time"] + 20)
self.assertTrue(os.path.isfile(os.path.join(self._rundir, "call-finish", "failed_tries", "0", "work", "iwuzhere")))
cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), [])
cfg.override({"task_runtime": {"delete_work": "failure"}})
cfg.override({"task_runtime": {"delete_work": "failure", "_mock_interruptions": 2}})
outputs = self._test_workflow(txt, cfg=cfg)
self.assertGreaterEqual(outputs["finish_time"], outputs["start_time"] + 20)
self.assertFalse(os.path.isdir(os.path.join(self._rundir, "call-finish", "failed_tries")))

0 comments on commit ab4fa19

Please sign in to comment.