From 48954883f846bfe6bbf329963d33fe6a90f9b02d Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Fri, 24 May 2019 11:45:26 -0700 Subject: [PATCH] task runtime skeleton (#134) Initial skeleton of the local task runtime, with proofs of concept for docker container management, file path mapping, and I/O stdlib functions like `stdout()` and `write_lines()`. --- .travis.yml | 3 +- Dockerfile | 3 +- Makefile | 3 +- WDL/Lint.py | 25 +- WDL/__init__.py | 3 +- WDL/_util.py | 30 +++ WDL/runtime/__init__.py | 3 + WDL/runtime/task.py | 505 +++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + stubs/docker/__init__.py | 32 +++ tests/test_5taskrun.py | 348 +++++++++++++++++++++++++++ 11 files changed, 926 insertions(+), 30 deletions(-) create mode 100644 WDL/_util.py create mode 100644 WDL/runtime/__init__.py create mode 100644 WDL/runtime/task.py create mode 100644 stubs/docker/__init__.py create mode 100644 tests/test_5taskrun.py diff --git a/.travis.yml b/.travis.yml index fa3a073b..69f70bfd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,5 +7,4 @@ script: - | set -ex -o pipefail make docker - docker run --env TRAVIS_JOB_ID=${TRAVIS_JOB_ID} --env TRAVIS_BRANCH=${TRAVIS_BRANCH} miniwdl coveralls - docker run --env TRAVIS_JOB_ID=${TRAVIS_JOB_ID} --env TRAVIS_BRANCH=${TRAVIS_BRANCH} miniwdl make sopretty + docker run --env TRAVIS_JOB_ID=${TRAVIS_JOB_ID} --env TRAVIS_BRANCH=${TRAVIS_BRANCH} -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp miniwdl bash -c "make sopretty && make && coveralls && make doc" diff --git a/Dockerfile b/Dockerfile index 8a021f5b..3fd70e0e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,8 +11,7 @@ RUN bash -o pipefail -c "pip3 install --user -r <(cat /miniwdl/requirements.txt # minor source change. ADD . /miniwdl WORKDIR /miniwdl -# Run the default make rule, which will trigger typechecking and tests. ENV PYTHONPATH $PYTHONPATH:/root/.local/lib/python3.6 ENV PATH $PATH:/root/.local/bin -RUN make && make doc +# will trigger typechecking & tests: CMD make diff --git a/Makefile b/Makefile index e3894f46..dc9c1efb 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,8 @@ sopretty: $(MAKE) pretty @git diff --quiet || (echo "ERROR: source files were modified by black; please fix up this commit with 'make pretty'"; exit 1) -# run tests in a docker image +# build docker image with current source tree, poised to run tests e.g.: +# docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp miniwdl docker: docker build -t miniwdl . diff --git a/WDL/Lint.py b/WDL/Lint.py index 656205ff..2eff04b3 100644 --- a/WDL/Lint.py +++ b/WDL/Lint.py @@ -754,7 +754,7 @@ def task(self, obj: WDL.Task) -> Any: else: assert isinstance(part, str) command.append(part) - col_offset, command = _strip_leading_whitespace("".join(command)) + col_offset, command = WDL._util.strip_leading_whitespace("".join(command)) # write out a temp file with this fake script tfn = os.path.join(self._tmpdir, obj.name) @@ -830,29 +830,6 @@ def _shellcheck_dummy_value(ty, pos): return "x" * desired_length -def _strip_leading_whitespace(txt): - lines = txt.split("\n") - - to_strip = None - for line in lines: - lsl = len(line.lstrip()) - if lsl: - c = len(line) - lsl - assert c >= 0 - if to_strip is None or to_strip > c: - to_strip = c - # TODO: do something about mixed tabs & spaces - - if not to_strip: - return (0, txt) - - for i, line_i in enumerate(lines): - if line_i.lstrip(): - lines[i] = line_i[to_strip:] - - return (to_strip, "\n".join(lines)) - - @a_linter class MixedIndentation(Linter): # Line of task command mixes tab and space indentation diff --git a/WDL/__init__.py b/WDL/__init__.py index 7a9b02f2..9d4d98e3 100644 --- a/WDL/__init__.py +++ b/WDL/__init__.py @@ -3,8 +3,9 @@ import errno import inspect from typing import List, Optional, Callable -from WDL import _parser, Error, Type, Value, Env, Expr, Tree, Walker, StdLib +from WDL import _util, _parser, Error, Type, Value, Env, Expr, Tree, Walker, StdLib from WDL.Tree import Decl, StructTypeDef, Task, Call, Scatter, Conditional, Workflow, Document +import WDL.runtime SourcePosition = Error.SourcePosition SourceNode = Error.SourceNode diff --git a/WDL/_util.py b/WDL/_util.py new file mode 100644 index 00000000..23aa9811 --- /dev/null +++ b/WDL/_util.py @@ -0,0 +1,30 @@ +# pyre-strict +# misc utility functions... + +from typing import Tuple + + +def strip_leading_whitespace(txt: str) -> Tuple[int, str]: + # Given a multi-line string, determine the largest w such that each line + # begins with at least w whitespace characters. Return w and the string + # with w characters removed from the beginning of each line. + lines = txt.split("\n") + + to_strip = None + for line in lines: + lsl = len(line.lstrip()) + if lsl: + c = len(line) - lsl + assert c >= 0 + if to_strip is None or to_strip > c: + to_strip = c + # TODO: do something about mixed tabs & spaces + + if not to_strip: + return (0, txt) + + for i, line_i in enumerate(lines): + if line_i.lstrip(): + lines[i] = line_i[to_strip:] + + return (to_strip, "\n".join(lines)) diff --git a/WDL/runtime/__init__.py b/WDL/runtime/__init__.py new file mode 100644 index 00000000..8be24881 --- /dev/null +++ b/WDL/runtime/__init__.py @@ -0,0 +1,3 @@ +# pyre-strict +import WDL.runtime.task +from WDL.runtime.task import run_local_task diff --git a/WDL/runtime/task.py b/WDL/runtime/task.py new file mode 100644 index 00000000..eb920413 --- /dev/null +++ b/WDL/runtime/task.py @@ -0,0 +1,505 @@ +# pyre-strict +import logging +import os +import tempfile +import json +import copy +import traceback +import docker +from datetime import datetime +from requests.exceptions import ReadTimeout +from abc import ABC, abstractmethod +from typing import NamedTuple, Tuple, List, Dict, Optional, Iterable +import WDL + + +class CommandError(WDL.Error.RuntimeError): + pass + + +class Terminated(WDL.Error.RuntimeError): + pass + + +class OutputError(WDL.Error.RuntimeError): + pass + + +class TaskFailure(WDL.Error.RuntimeError): + task_name: str + task_id: str + + def __init__(self, task_name: str, task_id: str) -> None: + super().__init__("task {} ({}) failed".format(task_name, task_id)) + self.task_name = task_name + self.task_id = task_id + + +class TaskContainer(ABC): + """ + Base class for task containers, subclassed by runtime-specific + implementations (e.g. Docker). + """ + + task_id: str + + host_dir: str + """ + :type: str + + The host path to the scratch directory that will be mounted inside the + container. + """ + + container_dir: str + """ + :type: str + + The scratch directory's mounted path inside the container. The task + command's working directory will be ``{container_dir}/work/``. + """ + + input_file_map: Dict[str, str] + """ + :type: Dict[str,str] + + A mapping of host input file paths to in-container mounted paths, + maintained by ``add_files``. + """ + + _running: bool + _terminate: bool + + def __init__(self, task_id: str, host_dir: str) -> None: + self.task_id = task_id + self.host_dir = host_dir + self.container_dir = "/mnt/miniwdl_task_container" + self.input_file_map = {} + self._running = False + self._terminate = False + + def add_files(self, host_files: List[str]) -> None: + """ + Use before running the container to add a list of host files to mount + inside the container as inputs. The host-to-container path mapping is + maintained in ``input_file_map``. + + Although ``add_files`` can be used multiple times, files should be + added together where possible, as this allows heuristics for dealing + with any name collisions among them. + """ + assert not self._running + ans = {} + basenames = {} + for fn in host_files: + if fn not in self.input_file_map: + bn = os.path.basename(fn) + basenames[bn] = 1 + basenames.get(bn, 0) + ans[fn] = os.path.join(self.container_dir, "inputs", bn) + + # Error out if any input filenames collide. + # TODO: assort them into separate subdirectories, also with a heuristic + # grouping together files that come from the same host directory. + collisions = [bn for bn, ct in basenames.items() if ct > 1] + if collisions: + raise WDL.Error.InputError("input filename collision(s): " + " ".join(collisions)) + + for k, v in ans.items(): + assert k not in self.input_file_map + self.input_file_map[k] = v + + def run(self, logger: logging.Logger, command: str) -> None: + """ + 1. Container is instantiated + 2. Command is executed in ``{host_dir}/work/`` (where {host_dir} is mounted to {container_dir} inside the container) + 3. Standard output is written to ``{host_dir}/stdout.txt`` + 4. Standard error is written to ``{host_dir}/stderr.txt`` and logged at INFO level + 5. Raises CommandError for nonzero exit code, or any other error + + The container is torn down in any case, including SIGTERM/SIGHUP signal which is trapped. + """ + assert not self._running + self._running = True + # container-specific logic should be in _run(). this wrapper traps SIGTERM/SIGHUP + # and sets self._terminate + return self._run(logger, command) + + @abstractmethod + def _run(self, logger: logging.Logger, command: str) -> None: + raise NotImplementedError() + + def host_file(self, container_file: str) -> str: + """ + Map an output file's in-container path under ``container_dir`` to a + host path. + """ + if os.path.isabs(container_file): + # handle output of std{out,err}.txt + if container_file in [ + os.path.join(self.container_dir, pipe_file) + for pipe_file in ["stdout.txt", "stderr.txt"] + ]: + return os.path.join(self.host_dir, os.path.basename(container_file)) + # handle output of an input file + host_input_files = [ + host_input_file + for (host_input_file, container_input_file) in self.input_file_map.items() + if container_input_file == container_file + ] + if host_input_files: + return host_input_files[0] + # otherwise make sure the file is in/under the working directory + dpfx = os.path.join(self.container_dir, "work") + "/" + if not container_file.startswith(dpfx): + raise OutputError( + "task outputs attempted to use a file outside its working directory: " + + container_file + ) + # turn it into relative path + container_file = container_file[len(dpfx) :] + if container_file.startswith("..") or "/.." in container_file: + raise OutputError( + "task outputs attempted to use file path with .. uplevels: " + container_file + ) + # join the relative path to the host working directory + ans = os.path.join(self.host_dir, "work", container_file) + if not os.path.isfile(ans) or os.path.islink(ans): + raise OutputError("task output file not found: " + container_file) + return ans + + def _stdlib_base(self) -> WDL.StdLib.Base: + # - Invocations of write_* will use self.add_files + ans = WDL.StdLib.Base() + + def _write_lines(array: WDL.Value.Array, self: "TaskContainer" = self) -> WDL.Value.File: + host_fn = None + os.makedirs(os.path.join(self.host_dir, "write"), exist_ok=True) + with tempfile.NamedTemporaryFile( + prefix="lines_", dir=os.path.join(self.host_dir, "write"), delete=False + ) as outfile: + for item in array.value: + assert isinstance(item, WDL.Value.String) + outfile.write(item.value.encode("utf-8")) + outfile.write(b"\n") + host_fn = outfile.name + assert os.path.isabs(host_fn) + self.add_files([host_fn]) + return WDL.Value.File(self.input_file_map[host_fn]) + + setattr(getattr(ans, "write_lines"), "F", _write_lines) + return ans + + def stdlib_input(self) -> WDL.StdLib.Base: + """ + Produce a StdLib implementation suitable for evaluation of task input + declarations and command interpolation + """ + + # - Invocations of size(), read_* are permitted only on input files (no string coercions) + # - forbidden/undefined: stdout, stderr, glob + + ans = self._stdlib_base() + return ans + + def stdlib_output(self) -> WDL.StdLib.Base: + """ + Produce a StdLib implementation suitable for evaluation of task output + expressions + """ + + # - size(), read_* and glob are permitted only on paths in or under the container directory (cdup from working directory) + # - their argument has to be translated from container to host path to actually execute + + ans = self._stdlib_base() + setattr( + getattr(ans, "stdout"), + "F", + lambda container_dir=self.container_dir: WDL.Value.File( + os.path.join(container_dir, "stdout.txt") + ), + ) + setattr( + getattr(ans, "stderr"), + "F", + lambda container_dir=self.container_dir: WDL.Value.File( + os.path.join(container_dir, "stderr.txt") + ), + ) + + def _read_string( + container_file: WDL.Value.File, self: "TaskContainer" = self + ) -> WDL.Value.String: + host_file = self.host_file(container_file.value) + assert host_file.startswith(self.host_dir) + with open(host_file, "r") as infile: + return WDL.Value.String(infile.read()) + + setattr(getattr(ans, "read_string"), "F", _read_string) + + return ans + + +class TaskDockerContainer(TaskContainer): + """ + TaskContainer docker runtime + """ + + image_tag: str = "ubuntu:18.04" + """ + :type: str + + docker image tag (set as desired before running) + """ + + def _run(self, logger: logging.Logger, command: str) -> None: + with open(os.path.join(self.host_dir, "command"), "x") as outfile: + outfile.write(command) + pipe_files = ["stdout.txt", "stderr.txt"] + for touch_file in pipe_files: + with open(os.path.join(self.host_dir, touch_file), "x") as outfile: + pass + + volumes = {} + # mount input files and command read-only + for host_path, container_path in self.input_file_map.items(): + volumes[host_path] = {"bind": container_path, "mode": "ro"} + volumes[os.path.join(self.host_dir, "command")] = { + "bind": os.path.join(self.container_dir, "command"), + "mode": "ro", + } + # mount stdout, stderr, and working directory read/write + for pipe_file in pipe_files: + volumes[os.path.join(self.host_dir, pipe_file)] = { + "bind": os.path.join(self.container_dir, pipe_file), + "mode": "rw", + } + volumes[os.path.join(self.host_dir, "work")] = { + "bind": os.path.join(self.container_dir, "work"), + "mode": "rw", + } + logger.debug("docker volume map: " + str(volumes)) + + # connect to dockerd + client = docker.from_env() + try: + container = None + exit_info = None + + try: + # run container + logger.info("docker starting image {}".format(self.image_tag)) + container = client.containers.run( + self.image_tag, + command=[ + "/bin/bash", + "-c", + "/bin/bash ../command >> ../stdout.txt 2>> ../stderr.txt", + ], + detach=True, + auto_remove=True, + working_dir=os.path.join(self.container_dir, "work"), + volumes=volumes, + ) + logger.debug( + "docker container name = {}, id = {}".format(container.name, container.id) + ) + + # long-poll for container exit + while exit_info is None: + try: + exit_info = container.wait(timeout=1) + except Exception as exn: + # TODO: tail stderr.txt into logger + if self._terminate: + raise Terminated() from None + # workaround for docker-py not throwing the exception class + # it's supposed to + s_exn = str(exn) + if "Read timed out" not in s_exn and "Timeout" not in s_exn: + raise + logger.info("container exit info = " + str(exit_info)) + except: + # make sure to stop & clean up the container if we're stopping due + # to SIGTERM or something. Most other cases should be handled by + # auto_remove. + if container: + try: + container.remove(force=True) + except Exception as exn: + logger.error("failed to remove docker container: " + str(exn)) + raise + + # retrieve and check container exit status + assert exit_info + if "StatusCode" not in exit_info: + raise CommandError( + "docker finished without reporting exit status in: " + str(exit_info) + ) + if exit_info["StatusCode"] != 0: + raise CommandError("command exit status = " + str(exit_info["StatusCode"])) + finally: + try: + client.close() + except: + logger.error("failed to close docker-py client") + + +def run_local_task( + task: WDL.Task, + posix_inputs: WDL.Env.Values, + task_id: Optional[str] = None, + parent_dir: Optional[str] = None, +) -> Tuple[str, WDL.Env.Values]: + """ + Run a task locally. + + Inputs shall have been typechecked already. + + File inputs are presumed to be local POSIX file paths that can be mounted into a container + """ + + parent_dir = parent_dir or os.getcwd() + + # formulate task ID & provision local directory + if task_id: + run_dir = os.path.join(parent_dir, task_id) + os.makedirs(run_dir, exist_ok=False) + else: + now = datetime.today() + task_id = now.strftime("%Y%m%d_%H%M%S") + "_" + task.name + try: + run_dir = os.path.join(parent_dir, task_id) + os.makedirs(run_dir, exist_ok=False) + except FileExistsError: + task_id = now.strftime("%Y%m%d_%H%M%S_") + str(now.microsecond) + "_" + task.name + run_dir = os.path.join(parent_dir, task_id) + os.makedirs(run_dir, exist_ok=False) + + # provision logger + logger = logging.getLogger("miniwdl_task:" + task_id) + logger.info("starting task") + logger.debug("task run directory " + run_dir) + + try: + # create appropriate TaskContainer + container = TaskDockerContainer(task_id, run_dir) + + # evaluate input/postinput declarations, including mapping from host to + # in-container file paths + container_env = _eval_task_inputs(logger, task, posix_inputs, container) + + # evaluate runtime.docker + image_tag_expr = task.runtime.get("docker", None) + if image_tag_expr: + assert isinstance(image_tag_expr, WDL.Expr.Base) + container.image_tag = image_tag_expr.eval(posix_inputs).value + + # interpolate command + command = WDL._util.strip_leading_whitespace( + task.command.eval(container_env, stdlib=container.stdlib_input()).value + )[1] + + # run container + container.run(logger, command) + + # evaluate output declarations + outputs = _eval_task_outputs(logger, task, container_env, container) + + logger.info("done") + return (run_dir, outputs) + except Exception as exn: + logger.debug(traceback.format_exc()) + wrapper = TaskFailure(task.name, task_id) + logger.error("{}: {}, {}".format(str(wrapper), exn.__class__.__name__, str(exn))) + raise wrapper from exn + + +def _eval_task_inputs( + logger: logging.Logger, task: WDL.Task, posix_inputs: WDL.Env.Values, container: TaskContainer +) -> WDL.Env.Values: + # Map all the provided input Files to in-container paths + # First make a pass to collect all the host paths and pass them to the + # container as a group (so that it can deal with any basename collisions) + host_files = [] + + def collect_host_files(v: WDL.Value.Base) -> None: + if isinstance(v, WDL.Value.File): + host_files.append(v.value) + for ch in v.children: + collect_host_files(ch) + + WDL.Env.map(posix_inputs, lambda namespace, binding: collect_host_files(binding.rhs)) + container.add_files(host_files) + + # copy posix_inputs with all Files mapped to their in-container paths + def map_files(v: WDL.Value.Base) -> WDL.Value.Base: + if isinstance(v, WDL.Value.File): + v.value = container.input_file_map[v.value] + for ch in v.children: + map_files(ch) + return v + + container_inputs = WDL.Env.map( + posix_inputs, lambda namespace, binding: map_files(copy.deepcopy(binding.rhs)) + ) + + # initialize value environment with the inputs + container_env = [] + for b in container_inputs: + assert isinstance(b, WDL.Env.Binding) + v = b.rhs + assert isinstance(v, WDL.Value.Base) + container_env = WDL.Env.bind(container_env, [], b.name, v) + vj = json.dumps(v.json) + logger.info("input {} -> {}".format(b.name, vj if len(vj) < 4096 else "(large)")) + + # collect remaining declarations requiring evaluation. + decls_to_eval = [] + for decl in (task.inputs or []) + (task.postinputs or []): + try: + WDL.Env.resolve(container_env, [], decl.name) + except KeyError: + decls_to_eval.append(decl) + + # TODO: topsort decls_to_eval according to internal dependencies + + # evaluate each declaration in order + # note: the write_* functions call container.add_files as a side-effect + stdlib = container.stdlib_input() + for decl in decls_to_eval: + v = WDL.Value.Null() + if decl.expr: + v = decl.expr.eval(container_env, stdlib=stdlib) + else: + assert decl.type.optional + vj = json.dumps(v.json) + logger.info("eval {} -> {}".format(decl.name, vj if len(vj) < 4096 else "(large)")) + container_env = WDL.Env.bind(container_env, [], decl.name, v) + + return container_env + + +def _eval_task_outputs( + logger: logging.Logger, task: WDL.Task, env: WDL.Env.Values, container: TaskContainer +) -> WDL.Env.Values: + + outputs = [] + for decl in task.outputs: + assert decl.expr + v = WDL.Value.from_json( + decl.type, decl.expr.eval(env, stdlib=container.stdlib_output()).json + ) # TODO: are we happy with this coercion approach? + logger.info("output {} -> {}".format(decl.name, json.dumps(v.json))) + outputs = WDL.Env.bind(outputs, [], decl.name, v) + + # map Files from in-container paths to host paths + def map_files(v: WDL.Value.Base) -> WDL.Value.Base: + if isinstance(v, WDL.Value.File): + host_file = container.host_file(v.value) + logger.debug("File {} -> {}".format(v.value, host_file)) + v.value = host_file + for ch in v.children: + map_files(ch) + return v + + return WDL.Env.map(outputs, lambda namespace, binding: map_files(copy.deepcopy(binding.rhs))) diff --git a/requirements.txt b/requirements.txt index 1af9ba7c..68da10fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ # for miniwdl development are listed in requirements.dev.txt. setuptools lark-parser==0.7.1 +docker diff --git a/stubs/docker/__init__.py b/stubs/docker/__init__.py new file mode 100644 index 00000000..bc46299f --- /dev/null +++ b/stubs/docker/__init__.py @@ -0,0 +1,32 @@ +from typing import Dict + +class Container: + @property + def name(self) -> str: + ... + + @property + def id(self) -> str: + ... + + def wait(self, timeout: int = None) -> Dict: + ... + + def remove(self, force: bool = False) -> None: + ... + +class Containers: + def run(self, image_tag: str, **kwargs) -> Container: + ... + +class Client: + @property + def containers() -> Containers: + ... + + def close() -> None: + ... + +def from_env() -> Client: + ... + diff --git a/tests/test_5taskrun.py b/tests/test_5taskrun.py new file mode 100644 index 00000000..2caa1bae --- /dev/null +++ b/tests/test_5taskrun.py @@ -0,0 +1,348 @@ +import unittest +import logging +import tempfile +import os +import docker +from .context import WDL + +class TestTaskRunner(unittest.TestCase): + + def setUp(self): + logging.basicConfig(level=logging.DEBUG, format='%(name)s %(levelname)s %(message)s') + self._dir = tempfile.mkdtemp(prefix="miniwdl_test_taskrun_") + + def _test_task(self, wdl:str, inputs: WDL.Env.Values = None, expected_outputs: WDL.Env.Values = None, expected_exception: Exception = None): + doc = WDL.parse_document(wdl) + assert len(doc.tasks) == 1 + doc.typecheck() + if expected_exception: + try: + WDL.runtime.run_local_task(doc.tasks[0], (inputs or []), parent_dir=self._dir) + except WDL.runtime.task.TaskFailure as exn: + self.assertIsInstance(exn.__context__, expected_exception) + return exn.__context__ + self.assertFalse(str(expected_exception) + " not raised") + rundir, outputs = WDL.runtime.run_local_task(doc.tasks[0], (inputs or []), parent_dir=self._dir) + if expected_outputs is not None: + self.assertEqual(outputs, expected_outputs) + return outputs + + def test_docker(self): + outputs = self._test_task(R""" + version 1.0 + task hello { + command <<< + cat /etc/issue + >>> + output { + String issue = read_string(stdout()) + } + } + """) + self.assertTrue("18.04" in WDL.Env.resolve(outputs, [], "issue").value) + + outputs = self._test_task(R""" + version 1.0 + task hello { + command <<< + cat /etc/issue + >>> + runtime { + docker: "ubuntu:18.10" + } + output { + String issue = read_string(stdout()) + } + } + """) + self.assertTrue("18.10" in WDL.Env.resolve(outputs, [], "issue").value) + + outputs = self._test_task(R""" + version 1.0 + task hello { + String version + command <<< + cat /etc/issue + >>> + runtime { + docker: "ubuntu:" + version + } + output { + String issue = read_string(stdout()) + } + } + """, WDL.Env.bind([], [], "version", WDL.Value.String("18.10"))) + self.assertTrue("18.10" in WDL.Env.resolve(outputs, [], "issue").value) + + self._test_task(R""" + version 1.0 + task hello { + command <<< + cat /etc/issue + >>> + runtime { + docker: "nonexistent:202407" + } + } + """, expected_exception=docker.errors.ImageNotFound) + + def test_hello_blank(self): + self._test_task(R""" + version 1.0 + task hello_blank { + input { + String who + } + command <<< + echo "Hello, ~{who}!" + >>> + } + """, + WDL.Env.bind([], [], "who", WDL.Value.String("Alyssa"))) + + def test_hello_file(self): + with open(os.path.join(self._dir, "alyssa.txt"), "w") as outfile: + outfile.write("Alyssa") + outputs = self._test_task(R""" + version 1.0 + task hello_file { + input { + File who + } + command <<< + echo -n "Hello, $(cat ~{who})!" > message.txt + >>> + output { + File message = "message.txt" + } + } + """, + WDL.Env.bind([], [], "who", WDL.Value.File(os.path.join(self._dir, "alyssa.txt")))) + with open(WDL.Env.resolve(outputs, [], "message").value) as infile: + self.assertEqual(infile.read(), "Hello, Alyssa!") + + # output an input file + outputs = self._test_task(R""" + version 1.0 + task hello_file { + input { + File who + } + command <<< + echo -n "Hello, $(cat ~{who})!" + >>> + output { + File who2 = who + } + } + """, + WDL.Env.bind([], [], "who", WDL.Value.File(os.path.join(self._dir, "alyssa.txt")))) + self.assertEqual(WDL.Env.resolve(outputs, [], "who2").value, os.path.join(self._dir, "alyssa.txt")) + + # stdout() + outputs = self._test_task(R""" + version 1.0 + task hello_file { + input { + File who + } + command <<< + echo -n "Hello, $(cat ~{who})!" + >>> + output { + File message = stdout() + } + } + """, + WDL.Env.bind([], [], "who", WDL.Value.File(os.path.join(self._dir, "alyssa.txt")))) + self.assertEqual(os.path.basename(WDL.Env.resolve(outputs, [], "message").value), "stdout.txt") + with open(WDL.Env.resolve(outputs, [], "message").value) as infile: + self.assertEqual(infile.read(), "Hello, Alyssa!") + + def test_weird_output_files(self): + # nonexistent output file + self._test_task(R""" + version 1.0 + task hello { + command {} + output { + File issue = "bogus.txt" + } + } + """, expected_exception=WDL.runtime.task.OutputError) + + # attempt to output file which exists but we're not allowed to output + self._test_task(R""" + version 1.0 + task hello { + command {} + output { + File issue = "/etc/issue" + } + } + """, expected_exception=WDL.runtime.task.OutputError) + + self._test_task(R""" + version 1.0 + task hello { + String trick = "/etc" + command {} + output { + File issue = trick + "/issue" + } + } + """, expected_exception=WDL.runtime.task.OutputError) + + self._test_task(R""" + version 1.0 + task hello { + command { + touch ../nono + } + output { + File issue = "../nono" + } + } + """, expected_exception=WDL.runtime.task.OutputError) + + # circuitously output a file using an absolute path + outputs = self._test_task(R""" + version 1.0 + task hello { + command { + echo -n $(pwd) > my_pwd + } + output { + File issue = read_string("my_pwd") + "/my_pwd" + } + } + """) + with open(WDL.Env.resolve(outputs, [], "issue").value) as infile: + pass + + def test_command_error(self): + self._test_task(R""" + version 1.0 + task hello { + command { + exit 1 + } + } + """, expected_exception=WDL.runtime.task.CommandError) + + def test_write_lines(self): + outputs = self._test_task(R""" + version 1.0 + task hello_friends { + input { + Array[String] friends + } + command <<< + awk '{printf(" Hello, %s!",$0)}' ~{write_lines(friends)} + >>> + output { + String messages = read_string(stdout()) + } + } + """, + WDL.Env.bind([], [], "friends", WDL.Value.from_json(WDL.Type.Array(WDL.Type.String()), ["Alyssa", "Ben"]))) + self.assertEqual(WDL.Env.resolve(outputs, [], "messages").value, " Hello, Alyssa! Hello, Ben!") + + outputs = self._test_task(R""" + version 1.0 + task hello_friends2 { + input { + Array[String] friends + } + File friends_txt = write_lines(friends) + command <<< + awk '{printf(" Hello, %s!",$0)}' ~{friends_txt} + >>> + output { + String messages = read_string(stdout()) + } + } + """, + WDL.Env.bind([], [], "friends", WDL.Value.from_json(WDL.Type.Array(WDL.Type.String()), ["Alyssa", "Ben"]))) + self.assertEqual(WDL.Env.resolve(outputs, [], "messages").value, " Hello, Alyssa! Hello, Ben!") + + def test_compound_files(self): + # tests filename mappings when Files are embedded in compound types + with open(os.path.join(self._dir, "alyssa.txt"), "w") as outfile: + outfile.write("Alyssa\n") + with open(os.path.join(self._dir, "ben.txt"), "w") as outfile: + outfile.write("Ben\n") + outputs = self._test_task(R""" + version 1.0 + task hello { + Array[File] files + command { + while read fn; do + cat "$fn" + done < ~{write_lines(files)} + echo -n Alyssa, > alyssa.csv + echo -n Ben, > ben.csv + } + output { + File stdout = stdout() + Array[File] friends = ["alyssa.csv", "ben.csv"] + } + } + """, WDL.Env.bind([], [], "files", WDL.Value.Array(WDL.Type.Array(WDL.Type.String), [ + WDL.Value.File(os.path.join(self._dir, "alyssa.txt")), + WDL.Value.File(os.path.join(self._dir, "ben.txt")), + ]))) + with open(WDL.Env.resolve(outputs, [], "stdout").value) as infile: + self.assertEqual(infile.read(), "Alyssa\nBen\n") + friends = WDL.Env.resolve(outputs, [], "friends") + self.assertEqual(len(friends.value), 2) + with open(friends.value[0].value) as infile: + self.assertEqual(infile.read(), "Alyssa,") + with open(friends.value[1].value) as infile: + self.assertEqual(infile.read(), "Ben,") + + def test_optional_inputs(self): + code = R""" + version 1.0 + task defaults { + input { + String s0 + String s1 = "ben" + String? s2 + } + command { + echo "~{s0}" + echo "~{s1}" + echo "~{if (defined(s2)) then s2 else 'None'}" + } + output { + String out = read_string(stdout()) + } + } + """ + outputs = self._test_task(code, WDL.Env.bind([], [], "s0", WDL.Value.String("alyssa"))) + self.assertEqual(WDL.Env.resolve(outputs, [], "out").value, "alyssa\nben\nNone\n") + + outputs = self._test_task(code, + WDL.Env.bind(WDL.Env.bind([], [], "s1", WDL.Value.String("cy")), + [], "s0", WDL.Value.String("alyssa"))) + self.assertEqual(WDL.Env.resolve(outputs, [], "out").value, "alyssa\ncy\nNone\n") + + outputs = self._test_task(code, + WDL.Env.bind(WDL.Env.bind([], [], "s2", WDL.Value.String("mallory")), + [], "s0", WDL.Value.String("alyssa"))) + self.assertEqual(WDL.Env.resolve(outputs, [], "out").value, "alyssa\nben\nmallory\n") + + # FIXME: need some restrictions on what File inputs can default to + self._test_task(R""" + version 1.0 + task hacker { + File host_passwords = "/etc/passwd" + command { + >&2 cat "~{host_passwords}" + } + output { + String owned = read_string(stderr()) + } + } + """)