Skip to content

Commit

Permalink
support runtime.returnCodes per WDL 1.1 spec
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed May 7, 2021
1 parent 92c1176 commit 1555e50
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 7 deletions.
19 changes: 17 additions & 2 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def _eval_task_runtime(
container: "runtime.task_container.TaskContainer",
env: Env.Bindings[Value.Base],
stdlib: StdLib.Base,
) -> Dict[str, Union[int, str]]:
) -> Dict[str, Union[int, str, List[int], List[str]]]:
runtime_values = {}
for key, v in cfg["task_runtime"].get_dict("defaults").items():
if isinstance(v, str):
Expand Down Expand Up @@ -495,13 +495,28 @@ def _eval_task_runtime(
ans["maxRetries"] = max(0, runtime_values["maxRetries"].coerce(Type.Int()).value)
if "preemptible" in runtime_values:
ans["preemptible"] = max(0, runtime_values["preemptible"].coerce(Type.Int()).value)
if "returnCodes" in runtime_values:
rcv = runtime_values["returnCodes"]
if isinstance(rcv, Value.String) and rcv.value == "*":
ans["returnCodes"] = "*"
elif isinstance(rcv, Value.Int):
ans["returnCodes"] = rcv.value
elif isinstance(rcv, Value.Array):
try:
ans["returnCodes"] = [v.coerce(Type.Int()).value for v in rcv.value]
except:
pass
if "returnCodes" not in ans:
raise Error.EvalError(
task.runtime["returnCodes"], "invalid setting of runtime.returnCodes"
)

if ans:
logger.info(_("effective runtime", **ans))
unused_keys = list(
key
for key in runtime_values
if key not in ("cpu", "memory", "docker", "container") and key not in ans
if key not in ("memory", "docker", "container") and key not in ans
)
if unused_keys:
logger.warning(_("ignored runtime settings", keys=unused_keys))
Expand Down
18 changes: 14 additions & 4 deletions WDL/runtime/task_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,20 +217,28 @@ def run(self, logger: logging.Logger, command: str) -> None:
raise Terminated(quiet=True)
self._running = True
try:
exit_status = self._run(logger, terminating, command)
exit_code = self._run(logger, terminating, command)
finally:
self._running = False

if exit_status != 0:
if not self.success_exit_code(exit_code):
raise CommandFailed(
exit_status, self.host_stderr_txt(), more_info=self.failure_info
exit_code, self.host_stderr_txt(), more_info=self.failure_info
) if not terminating() else Terminated()

@abstractmethod
def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: str) -> int:
# run command in container & return exit status
raise NotImplementedError()

def success_exit_code(self, exit_code: int) -> bool:
if "returnCodes" not in self.runtime_values:
return exit_code == 0
rcv = self.runtime_values["returnCodes"]
if isinstance(rcv, str) and rcv == "*":
return True
return exit_code in (rcv if isinstance(rcv, list) else [rcv])

def delete_work(self, logger: logging.Logger, delete_streams: bool = False) -> None:
"""
After the container exits, delete all filesystem traces of it except for task.log. That
Expand Down Expand Up @@ -645,7 +653,9 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
if attempt >= server_error_retries:
break
time.sleep(polling_period)
self.chown(logger, client, exit_code == 0)
self.chown(
logger, client, isinstance(exit_code, int) and self.success_exit_code(exit_code)
)
client.close()

def resolve_tag(
Expand Down
2 changes: 1 addition & 1 deletion docs/runner_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ The runner supports versions 1.1, 1.0, and draft-2 of the [WDL specification](ht
* `Object` type is unsupported except for initializing WDL 1.0+ `struct` types, which should be used instead.
* The `read_object()` and `read_objects()` library functions are available *only* for initializing structs and `Map[String,String]`
* Task may only *output* files created within/beneath its container's initial working directory, not e.g. under `/tmp` ([#214](https://github.com/chanzuckerberg/miniwdl/issues/214))
* The following task runtime values are ignored: `gpu` `disks` `returnCodes`
* The following task runtime values are ignored: `disks` `gpu`
* Rejects certain name collisions that Cromwell admits (spec-ambiguous), such as between:
* scatter variable and prior declaration
* output declaration and prior non-output declaration
Expand Down
64 changes: 64 additions & 0 deletions tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,70 @@ def test_runtime_memory_limit(self):
outputs = self._test_task(txt, {"memory": "256MB"}, cfg=cfg)
self.assertLess(outputs["memory_limit_in_bytes"], 300*1024*1024)

def test_runtime_returnCodes(self):
txt = R"""
version 1.0
task limit {
input {
Int status
}
command <<<
echo Hi
exit ~{status}
>>>
output {
File out = stdout()
}
runtime {
returnCodes: 42
}
}
"""
cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), [])
self._test_task(txt, {"status": 0}, cfg=cfg, expected_exception=WDL.runtime.CommandFailed)
self._test_task(txt, {"status": 42}, cfg=cfg)
txt = R"""
version 1.0
task limit {
input {
Int status
}
command <<<
echo Hi
exit ~{status}
>>>
output {
File out = stdout()
}
runtime {
returnCodes: [0,42]
}
}
"""
self._test_task(txt, {"status": 0}, cfg=cfg)
self._test_task(txt, {"status": 42}, cfg=cfg)
self._test_task(txt, {"status": 41}, cfg=cfg, expected_exception=WDL.runtime.CommandFailed)
txt = R"""
version 1.0
task limit {
input {
Int status
}
command <<<
echo Hi
exit ~{status}
>>>
output {
File out = stdout()
}
runtime {
returnCodes: "*"
}
}
"""
self._test_task(txt, {"status": 0}, cfg=cfg)
self._test_task(txt, {"status": 42}, cfg=cfg)

def test_input_files_rw(self):
txt = R"""
version 1.0
Expand Down

0 comments on commit 1555e50

Please sign in to comment.