Skip to content

Commit

Permalink
run_local_task option to copy input files and mount them read/write (#…
Browse files Browse the repository at this point in the history
…234)

cf. #210 

This implements the option to `run_local_task` but doesn't yet expose it through `run_local_workflow` or the CLI, to avoid creating merge conflicts with #229 wip.
  • Loading branch information
mlin committed Sep 7, 2019
1 parent 34c4425 commit 98e3a3d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 60 deletions.
2 changes: 1 addition & 1 deletion WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def runner(
if isinstance(exn, runtime.task.CommandFailure) and not (
kwargs["verbose"] or kwargs["debug"]
):
logger.error("run with --verbose for standard error logging")
logger.error("run with --verbose to include task standard error streams in this log")
logger.error("see task's standard error in %s", getattr(exn, "stderr_file"))
if isinstance(getattr(exn, "pos", None), SourcePosition):
pos = getattr(exn, "pos")
Expand Down
104 changes: 77 additions & 27 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import time
import math
import multiprocessing
import shutil
from abc import ABC, abstractmethod
from typing import Tuple, List, Dict, Optional, Callable
from typing import Tuple, List, Dict, Optional, Callable, Iterable, Set

from requests.exceptions import ReadTimeout
import docker
Expand Down Expand Up @@ -68,7 +69,7 @@ def __init__(self, run_id: str, host_dir: str) -> None:
self.input_file_map = {}
self._running = False

def add_files(self, host_files: List[str]) -> None:
def add_files(self, host_files: Iterable[str]) -> None:
"""
Use before running the container to add a list of host files to mount
inside the container as inputs. The host-to-container path mapping is
Expand Down Expand Up @@ -190,6 +191,16 @@ class TaskDockerContainer(TaskContainer):
docker image tag (set as desired before running)
"""

rw_inputs_dir: bool = False
"""
:type: bool
By default, input files are individually mounted read-only using their original paths. If
rw_inputs_dir is True, then the ``inputs`` subdirectory of ``self.host_dir`` is expected to
contain all input files; this whole directory is then mounted read/write, instead of the
individual file mounts.
"""

def _run(
self, logger: logging.Logger, terminating: Callable[[], bool], command: str, cpu: int
) -> int:
Expand All @@ -201,9 +212,14 @@ def _run(
pass

mounts = []
# mount input files and command read-only
for host_path, container_path in self.input_file_map.items():
mounts.append(f"{host_path}:{container_path}:ro")
# mount input files and command
if self.rw_inputs_dir:
mounts.append(
f"{os.path.join(self.host_dir, 'inputs')}:{os.path.join(self.container_dir, 'inputs')}:rw"
)
else:
for host_path, container_path in self.input_file_map.items():
mounts.append(f"{host_path}:{container_path}:ro")
mounts.append(
f"{os.path.join(self.host_dir, 'command')}:{os.path.join(self.container_dir, 'command')}:ro"
)
Expand Down Expand Up @@ -299,6 +315,7 @@ def poll_service(
elif state not in [
"new",
"pending",
"ready",
"assigned",
"accepted",
"preparing",
Expand All @@ -314,6 +331,7 @@ def run_local_task(
posix_inputs: Env.Bindings[Value.Base],
run_id: Optional[str] = None,
run_dir: Optional[str] = None,
copy_input_files: bool = False,
) -> Tuple[str, Env.Bindings[Value.Base]]:
"""
Run a task locally.
Expand All @@ -325,6 +343,7 @@ def run_local_task(
:param run_dir: outputs and scratch will be stored in this directory if it doesn't already
exist; if it does, a timestamp-based subdirectory is created and used (defaults
to current working directory)
:param copy_input_files: copy input files and mount them read/write instead of read-only
"""

run_id = run_id or task.name
Expand Down Expand Up @@ -371,10 +390,15 @@ def run_local_task(

# interpolate command
command = _util.strip_leading_whitespace(
task.command.eval(container_env, stdlib=InputStdLib(container)).value
task.command.eval(container_env, stdlib=InputStdLib(logger, container)).value
)[1]
logger.debug("command:\n%s", command.rstrip())

# if needed, copy input files for r/w mounting
if copy_input_files:
_copy_input_files(logger, container)
container.rw_inputs_dir = True

# start container & run command
container.run(logger, command, cpu)

Expand Down Expand Up @@ -404,20 +428,9 @@ def _eval_task_inputs(
posix_inputs: Env.Bindings[Value.Base],
container: TaskContainer,
) -> Env.Bindings[Value.Base]:
# Map all the provided input Files to in-container paths
# First make a pass to collect all the host paths and pass them to the
# container as a group (so that it can deal with any basename collisions)
host_files = []

def collect_host_files(v: Value.Base) -> None:
if isinstance(v, Value.File):
host_files.append(v.value)
for ch in v.children:
collect_host_files(ch)

for binding in posix_inputs:
collect_host_files(binding.value)
container.add_files(host_files)
# Map all the provided input Files to in-container paths
container.add_files(_filenames(posix_inputs))

# copy posix_inputs with all Files mapped to their in-container paths
def map_files(v: Value.Base) -> Value.Base:
Expand Down Expand Up @@ -455,7 +468,7 @@ def map_files(v: Value.Base) -> Value.Base:

# evaluate each declaration in that order
# note: the write_* functions call container.add_files as a side-effect
stdlib = InputStdLib(container)
stdlib = InputStdLib(logger, container)
for decl in decls_to_eval:
assert isinstance(decl, Tree.Decl)
v = Value.Null()
Expand All @@ -478,11 +491,42 @@ def map_files(v: Value.Base) -> Value.Base:
return container_env


def _copy_input_files(logger: logging.Logger, container: TaskContainer) -> None:
# for each virtualized filename in container.input_file_map, copy the corresponding host file
# into appropriate subdirectory of host_dir/inputs; making the virtualized mapping "real."
container_inputs_dir = os.path.join(container.container_dir, "inputs") + "/"

for host_filename, container_filename in container.input_file_map.items():
assert container_filename.startswith(container_inputs_dir)
host_copy_filename = os.path.join(
container.host_dir, "inputs", os.path.relpath(container_filename, container_inputs_dir)
)

logger.info("copy host input file %s -> %s", host_filename, host_copy_filename)
os.makedirs(os.path.dirname(host_copy_filename), exist_ok=True)
shutil.copy(host_filename, host_copy_filename)


def _filenames(env: Env.Bindings[Value.Base]) -> Set[str]:
"Get the filenames of all File values in the environment"
ans = set()

def collector(v: Value.Base) -> None:
if isinstance(v, Value.File):
ans.add(v.value)
for ch in v.children:
collector(ch)

for b in env:
collector(b.value)
return ans


def _eval_task_outputs(
logger: logging.Logger, task: Tree.Task, env: Env.Bindings[Value.Base], container: TaskContainer
) -> Env.Bindings[Value.Base]:

stdlib = OutputStdLib(container)
stdlib = OutputStdLib(logger, container)
outputs = Env.Bindings()
for decl in task.outputs:
assert decl.expr
Expand Down Expand Up @@ -515,34 +559,40 @@ def map_files(v: Value.Base) -> Value.Base:


class _StdLib(StdLib.Base):
logger: logging.Logger
container: TaskContainer
inputs_only: bool # if True then only permit access to input files

def __init__(self, container: TaskContainer, inputs_only: bool) -> None:
def __init__(self, logger: logging.Logger, container: TaskContainer, inputs_only: bool) -> None:
super().__init__(write_dir=os.path.join(container.host_dir, "write_"))
self.logger = logger
self.container = container
self.inputs_only = inputs_only

def _devirtualize_filename(self, filename: str) -> str:
# check allowability of reading this file, & map from in-container to host
return self.container.host_file(filename, inputs_only=self.inputs_only)
ans = self.container.host_file(filename, inputs_only=self.inputs_only)
self.logger.debug("read_ %s from host %s", filename, ans)
return ans

def _virtualize_filename(self, filename: str) -> str:
# register new file with container input_file_map
self.container.add_files([filename])
self.logger.debug("write_ host %s", filename)
self.logger.info("wrote %s", self.container.input_file_map[filename])
return self.container.input_file_map[filename]


class InputStdLib(_StdLib):
# StdLib for evaluation of task inputs and command
def __init__(self, container: TaskContainer) -> None:
super().__init__(container, True)
def __init__(self, logger: logging.Logger, container: TaskContainer) -> None:
super().__init__(logger, container, True)


class OutputStdLib(_StdLib):
# StdLib for evaluation of task outputs
def __init__(self, container: TaskContainer) -> None:
super().__init__(container, False)
def __init__(self, logger: logging.Logger, container: TaskContainer) -> None:
super().__init__(logger, container, False)

setattr(
self,
Expand Down
17 changes: 1 addition & 16 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from .. import Env, Type, Value, Tree, StdLib
from ..Error import InputError
from .._util import write_values_json, provision_run_dir, LOGGING_FORMAT, install_coloredlogs
from .task import run_local_task
from .task import run_local_task, _filenames
from .error import TaskFailure


Expand Down Expand Up @@ -552,21 +552,6 @@ def _virtualize_filename(self, filename: str) -> str:
return filename


def _filenames(env: Env.Bindings[Value.Base]) -> Set[str]:
"Get the filenames of all File values in the environment"
ans = set()

def collector(v: Value.Base) -> None:
if isinstance(v, Value.File):
ans.add(v.value)
for ch in v.children:
collector(ch)

for b in env:
collector(b.value)
return ans


def run_local_workflow(
workflow: Tree.Workflow,
posix_inputs: Env.Bindings[Value.Base],
Expand Down
68 changes: 52 additions & 16 deletions tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ def setUp(self):
logging.basicConfig(level=logging.DEBUG, format='%(name)s %(levelname)s %(message)s')
self._dir = tempfile.mkdtemp(prefix="miniwdl_test_taskrun_")

def _test_task(self, wdl:str, inputs = None, expected_exception: Exception = None):
def _test_task(self, wdl:str, inputs = None, expected_exception: Exception = None, copy_input_files: bool = False):
WDL._util.ensure_swarm(logging.getLogger("test_task"))
try:
doc = WDL.parse_document(wdl)
assert len(doc.tasks) == 1
doc.typecheck()
if isinstance(inputs, dict):
inputs = WDL.values_from_json(inputs, doc.tasks[0].available_inputs, doc.tasks[0].required_inputs)
rundir, outputs = WDL.runtime.run_local_task(doc.tasks[0], (inputs or WDL.Env.Bindings()), run_dir=self._dir)
rundir, outputs = WDL.runtime.run_local_task(doc.tasks[0], (inputs or WDL.Env.Bindings()), run_dir=self._dir, copy_input_files=copy_input_files)
except WDL.runtime.TaskFailure as exn:
if expected_exception:
self.assertIsInstance(exn.__context__, expected_exception)
Expand Down Expand Up @@ -578,36 +578,42 @@ def test_filename_collisions(self):
outfile.write("x\n")
with open(os.path.join(self._dir, "b", "x.y"), "w") as outfile:
outfile.write("x.y\n")
outputs = self._test_task(R"""
txt = R"""
version 1.0
task t {
input {
Array[File] files
}
command {
sort "~{write_lines(files)}"
cat "~{write_lines(files)}"
}
output {
Array[String] outfiles = read_lines(stdout())
}
}
""", {"files": [
"""
inp = {"files": [
os.path.join(self._dir, "a", "x"),
os.path.join(self._dir, "a", "x.y"),
os.path.join(self._dir, "b", "x"),
os.path.join(self._dir, "b", "x.y"),
os.path.join(self._dir, "b", "x.y") # intentional duplicate
]})
outfiles = outputs["outfiles"]
self.assertEqual(len(outfiles), 5)
self.assertEqual(os.path.basename(outfiles[0]), "x")
self.assertEqual(os.path.basename(outfiles[1]), "x.y")
self.assertEqual(os.path.dirname(outfiles[0]), os.path.dirname(outfiles[1]))
self.assertEqual(os.path.basename(outfiles[2]), "x")
self.assertEqual(os.path.basename(outfiles[3]), "x.y")
self.assertEqual(os.path.dirname(outfiles[2]), os.path.dirname(outfiles[3]))
self.assertNotEqual(os.path.dirname(outfiles[0]), os.path.dirname(outfiles[2]))
self.assertEqual(outfiles[3], outfiles[4])
]}
def chk(outfiles):
self.assertEqual(len(outfiles), 5)
self.assertEqual(os.path.basename(outfiles[0]), "x")
self.assertEqual(os.path.basename(outfiles[1]), "x.y")
self.assertEqual(os.path.dirname(outfiles[0]), os.path.dirname(outfiles[1]))
self.assertEqual(os.path.basename(outfiles[2]), "x")
self.assertEqual(os.path.basename(outfiles[3]), "x.y")
self.assertEqual(os.path.dirname(outfiles[2]), os.path.dirname(outfiles[3]))
self.assertNotEqual(os.path.dirname(outfiles[0]), os.path.dirname(outfiles[2]))
self.assertEqual(outfiles[3], outfiles[4])

outputs = self._test_task(txt, inp)
chk(outputs["outfiles"])
outputs = self._test_task(txt, inp, copy_input_files=True)
chk(outputs["outfiles"])

def test_topsort(self):
txt = R"""
Expand Down Expand Up @@ -702,3 +708,33 @@ def test_cpu_limit(self):
# check task with overkill number of CPUs gets scheduled
outputs = self._test_task(txt, {"n": 8, "cpu": 9999})
self.assertLessEqual(outputs["wall_seconds"], 6)

def test_input_files_rw(self):
txt = R"""
version 1.0
task clobber {
input {
Array[File] files
}
command <<<
set -x
touch ~{sep=" " files}
mv ~{files[0]} alyssa2.txt
rm ~{files[1]}
>>>
output {
File outfile = glob("*.txt")[0]
}
}
"""
with open(os.path.join(self._dir, "alyssa.txt"), "w") as outfile:
outfile.write("Alyssa\n")
with open(os.path.join(self._dir, "ben.txt"), "w") as outfile:
outfile.write("Ben\n")

self._test_task(txt, {"files": [os.path.join(self._dir, "alyssa.txt"), os.path.join(self._dir, "ben.txt")]},
expected_exception=WDL.runtime.task.CommandFailure)

outputs = self._test_task(txt, {"files": [os.path.join(self._dir, "alyssa.txt"), os.path.join(self._dir, "ben.txt")]},
copy_input_files=True)
self.assertTrue(outputs["outfile"].endswith("alyssa2.txt"))

0 comments on commit 98e3a3d

Please sign in to comment.