Skip to content

Commit

Permalink
Merge 9caaefb into 1a223a9
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Apr 2, 2020
2 parents 1a223a9 + 9caaefb commit 9e1ca5e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 25 deletions.
23 changes: 17 additions & 6 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ auto_init = true
[file_io]
# During startup, require the run directory and input files to reside somewhere within this root
# directory. This constraint is needed in cluster deployments relying on a shared mount. It can
# also be useful on a single node to prevent inadvertent consumption of a small boot/home volume,
# also be useful on a single node to prevent accidental consumption of a small boot/home volume,
# by confining workflow operations to some spacious scratch/data mount.
root = /
# Populate task working directory with writable copies of the input files, instead of mounting them
# in situ & read-only. Needed if tasks want to write/move/rename input files, but costs time and
# disk space. --copy-input-files
copy_input_files = false
# Each succeeded run directory has an "output_links/" folder containing (by default) a symbolic
# link to each output file in its original working location. If output_hardlinks is set, then
# output_links/ is populated with hardlinks instead of symlinks.
# With output_hardlinks, one may clean up any garbage left behind in a task's run directory by
# simply "rm -rf ./work/" while the hardlinks retain the output files. Otherwise, beware the
# potential confusion arising from files with multiple hardlinks.
# link to each output file in its original working location. If output_hardlinks is true, then
# output_links/ is populated with hardlinks instead of symlinks. Beware the potential confusion
# arising from files with multiple hardlinks! See also [task_runtime] delete_work, below.
output_hardlinks = false


Expand All @@ -60,6 +58,19 @@ defaults = {
# Run the command script as the invoking user's uid:gid instead of usually running as root. More
# secure, but interferes with commands that assume root access e.g. apt-get. --as-me
as_user = false
# Delete task working directory upon completion. The task container's working directory is a
# bind-mounted host directory, so files written into it are left behind after the container is torn
# down. If tasks write large non-output files into their working directory (instead of $TMPDIR as
# they should), then it can be useful to delete them automatically.
# Values:
# false = never delete (default)
# success = delete working directories of succeeded tasks
# failure = " failed tasks
# always = " both succeeded and failed tasks
# The "success" and "always" settings require [file_io] output_hardlinks, above, to be true;
# otherwise, output files would be deleted too. Input/output JSON, logs, and stdout/stderr are
# always retained in the task run directory (above the container working directory).
delete_work = false


[download_cache]
Expand Down
50 changes: 32 additions & 18 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def _run(
client = docker.from_env(timeout=900)
resources, user, groups = self.misc_config(logger, client, cpu, memory)
svc = None
exit_code = None
try:
# run container as a transient docker swarm service, letting docker handle the resource
# scheduling (waiting until requested # of CPUs are available).
Expand All @@ -450,7 +451,6 @@ def _run(
svc = client.services.create(self.image_tag, **kwargs)
logger.debug(_("docker service", name=svc.name, id=svc.short_id))

exit_code = None
# stream stderr into log
with PygtailLogger(logger, os.path.join(self.host_dir, "stderr.txt")) as poll_stderr:
# poll for container exit
Expand Down Expand Up @@ -480,7 +480,7 @@ def _run(
svc.remove()
except:
logger.exception("failed to remove docker service")
self.chown(logger, client)
self.chown(logger, client, exit_code == 0)
try:
client.close()
except:
Expand Down Expand Up @@ -611,15 +611,14 @@ def poll_service(

return None

def chown(self, logger: logging.Logger, client: docker.DockerClient) -> None:
def chown(self, logger: logging.Logger, client: docker.DockerClient, success: bool) -> None:
"""
After task completion, chown all files in the working directory to the invoking user:group,
instead of leaving them frequently owned by root or some other arbitrary user id (image-
dependent). We do this in a funny way via Docker; see GitHub issue #271 for discussion of
alternatives and their problems.
"""
if not self.cfg["task_runtime"].get_bool("as_user") and (os.geteuid() or os.getegid()):
t_0 = time.monotonic()
script = f"""
chown -RP {os.geteuid()}:{os.getegid()} {shlex.quote(os.path.join(self.container_dir, 'work'))}
""".strip()
Expand All @@ -646,16 +645,9 @@ def chown(self, logger: logging.Logger, client: docker.DockerClient) -> None:
if chowner:
chowner.remove()
except:
logger.exception("post-task chown failed")
finally:
t_delta = time.monotonic() - t_0
if t_delta >= 60:
logger.warning(
_(
"post-task chown was slow (may indicate excessive file count and/or IOPS exhaustion)",
seconds=int(t_delta),
)
)
if success:
raise
logger.exception("post-task chown also failed")


def run_local_task(
Expand Down Expand Up @@ -702,9 +694,10 @@ def run_local_task(
)
)
logger.info(_("thread", ident=threading.get_ident()))
cache = _cache or CallCache(cfg, logger)

try:
cache = _cache or CallCache(cfg, logger)

# start plugin coroutines and process inputs through them
with compose_coroutines(
[
Expand Down Expand Up @@ -757,9 +750,6 @@ def run_local_task(
# evaluate output declarations
outputs = _eval_task_outputs(logger, task, container_env, container)

# make sure everything will be accessible to downstream tasks
chmod_R_plus(container.host_dir, file_bits=0o660, dir_bits=0o770)

# create output_links
outputs = link_outputs(
outputs, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
Expand All @@ -769,6 +759,11 @@ def run_local_task(
recv = plugins.send({"outputs": outputs})
outputs = recv["outputs"]

# clean up, if so configured, and make sure output files will be accessible to
# downstream tasks
_delete_work(cfg, logger, run_dir, True)
chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)

# write outputs.json
write_values_json(
outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
Expand All @@ -787,6 +782,10 @@ def run_local_task(
except Exception as exn2:
logger.debug(traceback.format_exc())
logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
try:
_delete_work(cfg, logger, run_dir, False)
except:
logger.exception("delete_work also failed")
raise wrapper from exn


Expand Down Expand Up @@ -1174,6 +1173,21 @@ def map_files(v: Value.Base, dn: str) -> Value.Base:
)


def _delete_work(cfg: config.Loader, logger: logging.Logger, run_dir: str, success: bool) -> None:
opt = cfg["task_runtime"]["delete_work"].strip().lower()
if opt == "always" or (success and opt == "success") or (not success and opt == "failure"):
if success and not cfg["file_io"].get_bool("output_hardlinks"):
logger.warning(
"ignoring configuration [task_runtime] delete_work because it requires [file_io] output_hardlinks = true"
)
return
for dn in ["write_", "work"]:
dn = os.path.join(run_dir, dn)
if os.path.isdir(dn):
shutil.rmtree(dn)
logger.info(_("deleted working directory", dir=dn))


class _StdLib(StdLib.Base):
logger: logging.Logger
container: TaskContainer
Expand Down
39 changes: 38 additions & 1 deletion tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def _test_task(self, wdl:str, inputs = None, expected_exception: Exception = Non
if isinstance(inputs, dict):
inputs = WDL.values_from_json(inputs, doc.tasks[0].available_inputs, doc.tasks[0].required_inputs)
rundir, outputs = WDL.runtime.run_local_task(cfg, doc.tasks[0], (inputs or WDL.Env.Bindings()), run_dir=self._dir, **kwargs)
self._rundir = rundir
except WDL.runtime.RunFailed as exn:
if expected_exception:
self.assertIsInstance(exn.__context__, expected_exception)
Expand Down Expand Up @@ -920,7 +921,43 @@ def test_workdir_ownership(self):
outputs = self._test_task(txt, {"files": [os.path.join(self._dir, "alyssa.txt"), os.path.join(self._dir, "ben.txt")]})
self.assertEqual(len(outputs["uids"]), 1)
self.assertEqual(outputs["uids"][0], os.geteuid())


def test_delete_work(self):
txt = R"""
version 1.0
task xxx {
input {
Array[File] files
}
File written = write_lines(files)
command <<<
set -euxo pipefail
cp "~{written}" foo.txt
cp "~{files[0]}" bar.txt
>>>
output {
Array[File] outfiles = [write_lines(files), "foo.txt", "bar.txt"]
}
}
"""
with open(os.path.join(self._dir, "alyssa.txt"), "w") as outfile:
outfile.write("Alyssa\n")
with open(os.path.join(self._dir, "ben.txt"), "w") as outfile:
outfile.write("Ben\n")
inputs = {"files": [os.path.join(self._dir, "alyssa.txt"), os.path.join(self._dir, "ben.txt")]}
cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), [])
cfg.override({"file_io": {"output_hardlinks": True}, "task_runtime": {"delete_work": "success"}})
output = self._test_task(txt, inputs, cfg=cfg)
self.assertFalse(os.path.isdir(os.path.join(self._rundir, "work")))
self.assertFalse(os.path.isdir(os.path.join(self._rundir, "write_")))
for fn in output["outfiles"]:
self.assertTrue(os.path.isfile(fn) and not os.path.islink(fn))

cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), [])
cfg.override({"file_io": {"output_hardlinks": False}, "task_runtime": {"delete_work": "success"}})
output = self._test_task(txt, inputs, cfg=cfg)
self.assertTrue(os.path.isfile(os.path.join(self._rundir, "work", "foo.txt")))

def test_plugins(self):
def my_plugin(cfg, logger, task, run_id, run_dir, **recv):
logger = logger.getChild("my_plugin")
Expand Down

0 comments on commit 9e1ca5e

Please sign in to comment.