Skip to content

Commit

Permalink
Merge d9d4b95 into bf6fd82
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Nov 19, 2019
2 parents bf6fd82 + d9d4b95 commit 18e041a
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 138 deletions.
111 changes: 31 additions & 80 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,42 +373,34 @@ def fill_run_subparser(subparsers):

def runner(
uri,
task=None,
inputs=[],
input_file=None,
json_only=False,
empty=[],
check_quant=True,
task=None,
rundir=None,
json_only=False,
path=None,
check_quant=True,
**kwargs,
):
# load WDL document
doc = load(uri, path or [], check_quant=check_quant, read_source=read_source)

# validate the provided inputs and prepare Cromwell-style JSON
# parse and validate the provided inputs
target, input_env, input_json = runner_input(doc, inputs, input_file, empty, task=task)

if json_only:
print(json.dumps(input_json, indent=2))
sys.exit(0)

run_kwargs = dict(
(k, kwargs[k])
for k in ["copy_input_files", "max_runtime_cpu", "max_runtime_memory", "as_me"]
)
if run_kwargs["max_runtime_memory"]:
run_kwargs["max_runtime_memory"] = parse_byte_size(run_kwargs["max_runtime_memory"])
if isinstance(target, Workflow):
run_kwargs["max_tasks"] = kwargs["max_tasks"]

# set up logging
level = NOTICE_LEVEL
if kwargs["verbose"]:
level = VERBOSE_LEVEL
if kwargs["debug"]:
level = logging.DEBUG
if kwargs["no_color"]:
os.environ["NO_COLOR"] = "" # picked up by _util.install_coloredlogs()
# picked up by _util.install_coloredlogs()
os.environ["NO_COLOR"] = os.environ.get("NO_COLOR", "")
logging.basicConfig(level=level)
logger = logging.getLogger("miniwdl-run")
install_coloredlogs(logger)
Expand All @@ -422,13 +414,21 @@ def runner(

rerun_sh = f"pushd {shellquote(os.getcwd())} && miniwdl {' '.join(shellquote(t) for t in sys.argv[1:])}; popd"

# configuration
run_kwargs = dict(
(k, kwargs[k])
for k in ["copy_input_files", "max_runtime_cpu", "max_runtime_memory", "as_me"]
)
if run_kwargs["max_runtime_memory"]:
run_kwargs["max_runtime_memory"] = parse_byte_size(run_kwargs["max_runtime_memory"])
if "rundir" in kwargs:
run_kwargs["run_dir"] = kwargs["rundir"]

ensure_swarm(logger)

# run & handle any errors
try:
entrypoint = (
runtime.run_local_task if isinstance(target, Task) else runtime.run_local_workflow
)
rundir, output_env = entrypoint(target, input_env, run_dir=rundir, **run_kwargs)
rundir, output_env = runtime.run(target, input_env, **run_kwargs)
except Exception as exn:
outer_rundir = None
inner_rundir = None
Expand Down Expand Up @@ -464,12 +464,11 @@ def runner(
raise
sys.exit(2)

# link output files
outputs_json = values_to_json(output_env, namespace=target.name)
runner_organize_outputs(target, {"outputs": outputs_json}, rundir)
# report
with open(os.path.join(rundir, "rerun"), "w") as rerunfile:
print(rerun_sh, file=rerunfile)

outputs_json = {"outputs": values_to_json(output_env, namespace=target.name), "dir": rundir}
print(json.dumps(outputs_json, indent=2))
return outputs_json


Expand Down Expand Up @@ -721,51 +720,6 @@ def runner_input_value(s_value, ty):
)


def runner_organize_outputs(target, outputs_json, rundir):
"""
After a successful workflow run, the output files are typically sprayed
across a bushy directory tree used for execution. To help the user find
what they're looking for, we create another directory tree with nicer
organization, containing symlinks to the output files (so as not to disturb
them).
One of the subtleties is to organize compound outputs like Array[File],
Array[Array[File]], etc.
"""
assert "dir" not in outputs_json
outputs_json["dir"] = rundir
print(json.dumps(outputs_json, indent=2))
with open(os.path.join(rundir, "outputs.json"), "w") as outfile:
print(json.dumps(outputs_json, indent=2), file=outfile)

os.makedirs(os.path.join(rundir, "output_links"), exist_ok=False)

def link_output_files(dn, files):
# dn: output directory which already exists
# files: either a filename str, or a [nested] list thereof
if isinstance(files, str) and os.path.exists(files):
os.symlink(files, os.path.join(dn, os.path.basename(files)))
if isinstance(files, list) and files:
d = int(math.ceil(math.log10(len(files)))) # how many digits needed
for i, elt in enumerate(files):
subdn = os.path.join(dn, str(i).rjust(d, "0"))
os.makedirs(subdn, exist_ok=False)
link_output_files(subdn, elt)

def output_links(binding):
fqon = ".".join([target.name, binding.name])
if _is_files(binding.value) and fqon in outputs_json["outputs"]:
odn = os.path.join(rundir, "output_links", fqon)
os.makedirs(os.path.join(rundir, odn), exist_ok=False)
link_output_files(odn, outputs_json["outputs"][fqon])
return True

for binding in target.effective_outputs:
output_links(binding)
# TODO: handle File's inside other compound types,
# Pair[File,File], Map[String,File], Structs, etc.


def fill_run_self_test_subparser(subparsers):
run_parser = subparsers.add_parser(
"run_self_test",
Expand Down Expand Up @@ -840,7 +794,7 @@ def run_self_test(**kwargs):
]
if kwargs["as_me"]:
argv.append("--as-me")
outputs = main(argv)
outputs = main(argv)["outputs"]

assert len(outputs["hello_caller.messages"]) == 2
assert outputs["hello_caller.messages"][0].rstrip() == "Hello, Alyssa P. Hacker!"
Expand Down Expand Up @@ -1035,7 +989,14 @@ def cromwell(
)
except:
die("failed to find outputs JSON in Cromwell standard output")
runner_organize_outputs(target, outputs_json, rundir)

assert "dir" not in outputs_json
outputs_json["dir"] = rundir
print(json.dumps(outputs_json, indent=2))
with open(os.path.join(rundir, "outputs.json"), "w") as outfile:
print(json.dumps(outputs_json["outputs"], indent=2), file=outfile)

runtime.make_output_links(outputs_json["outputs"], rundir)

sys.exit(proc.returncode)

Expand Down Expand Up @@ -1077,16 +1038,6 @@ def ensure_cromwell_jar(jarfile=None):
return jarpath


def _is_files(ty):
"""
is ty a File or an Array[File] or an Array[Array[File]] or an Array[Array[Array[File]]]...
"""
return isinstance(ty, Type.File) or (
isinstance(ty, Type.Array)
and (isinstance(ty.item_type, Type.File) or _is_files(ty.item_type))
)


def die(msg, status=2):
print("\n" + msg + "\n", file=sys.stderr)
sys.exit(status)
1 change: 1 addition & 0 deletions WDL/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# PYTHON_ARGCOMPLETE_OK
from . import CLI
import sys

if __name__ == "__main__":
CLI.main()
22 changes: 21 additions & 1 deletion WDL/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
# pyre-strict
from typing import Union, Dict, Tuple, Any
from .. import Tree, Value, Env
from . import task
from . import workflow
from .error import *
from .task import run_local_task
from .task import run_local_task, make_output_links
from .workflow import run_local_workflow


def run(
exe: Union[Tree.Task, Tree.Workflow],
inputs: Env.Bindings[Value.Base],
**run_kwargs: Dict[str, Any],
) -> Tuple[str, Env.Bindings[Value.Base]]:
"""
Run the task or workflow given the inputs environment, and any configuration arguments to
``run_local_{task,workflow}``.
``inputs`` may be parsed from a JSON dict using :func:`~WDL.values_from_json`. The
workflow/task name should NOT be used as a namespace for the input values.
"""
if "max_tasks" in run_kwargs and isinstance(exe, Tree.Task):
del run_kwargs["max_tasks"] # N/A to run_local_task
entrypoint = run_local_task if isinstance(exe, Tree.Task) else run_local_workflow
return entrypoint(exe, inputs, **run_kwargs) # pyre-ignore
56 changes: 54 additions & 2 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Local task runner
"""
import logging
import math
import os
import json
import copy
Expand All @@ -13,8 +14,9 @@
import threading
import shutil
import shlex
import re
from abc import ABC, abstractmethod
from typing import Tuple, List, Dict, Optional, Callable, Iterable, Set
from typing import Tuple, List, Dict, Optional, Callable, Iterable, Set, Any
import psutil
import docker
from .. import Error, Type, Env, Value, StdLib, Tree, _util
Expand Down Expand Up @@ -523,7 +525,11 @@ def run_local_task(
# evaluate output declarations
outputs = _eval_task_outputs(logger, task, container_env, container)

write_values_json(outputs, os.path.join(run_dir, "outputs.json"))
write_values_json(outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name)

from .. import values_to_json

make_output_links(values_to_json(outputs, namespace=task.name), run_dir) # pyre-fixme
logger.notice("done") # pyre-fixme
return (run_dir, outputs)
except Exception as exn:
Expand Down Expand Up @@ -828,6 +834,52 @@ def rewrite_files(v: Value.Base, output_name: str) -> None:
return outputs


def make_output_links(outputs_json: Dict[str, Any], run_dir: str) -> None:
"""
Following a successful run, the output files may be scattered throughout a complex directory
tree used for execution. To help navigating this, generate a subdirectory of the run directory
containing nicely organized symlinks to the output files.
Given ``WDL.Env.Bindings[WDL.Value.Base]`` outputs, this expects to receive
``WDL.values_to_json(outputs, namespace=targets.name)`` instead of outputs directly. This makes
it compatible with Cromwell's output JSON too.
For security reasons, omits any files not inside run_dir (e.g. if the outputs include an input
file located elsewhere)
"""

def traverse(v: Any, dn: str) -> None: # pyre-fixme
assert isinstance(v, (str, int, float, list, dict)) or v is None
if (
isinstance(v, str)
and v.startswith(run_dir + "/")
and os.path.isfile(v)
and os.path.realpath(v).startswith(os.path.realpath(run_dir) + "/")
):
os.makedirs(dn, exist_ok=False)
os.symlink(v, os.path.join(dn, os.path.basename(v)))
elif isinstance(v, list) and len(v):
d = int(math.ceil(math.log10(len(v)))) # how many digits needed
for i, elt in enumerate(v):
traverse(elt, os.path.join(dn, str(i).rjust(d, "0")))
elif isinstance(v, dict):
# create a subdirectory for each key, as long as the key names seem to make reasonable
# path components; otherwise, treat the dict as a list of its values (this is possible
# in Maps where keys can be arbitrary)
if (
sum(1 for key in v if re.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", key) is None)
== 0
):
for key, value in v.items():
traverse(value, os.path.join(dn, key))
else:
traverse(list(v.values()), dn)

dn0 = os.path.join(run_dir, "output_links")
os.makedirs(dn0, exist_ok=False)
traverse(outputs_json, dn0)


class _StdLib(StdLib.Base):
logger: logging.Logger
container: TaskContainer
Expand Down
12 changes: 7 additions & 5 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
import pickle
import threading
import copy
import pkg_resources
from concurrent import futures
from typing import Optional, List, Set, Tuple, NamedTuple, Dict, Union, Iterable, Callable, Any
import pkg_resources
from .. import Env, Type, Value, Tree, StdLib
from ..Error import InputError
from .task import run_local_task, _filenames
from .task import run_local_task, _filenames, make_output_links
from .download import able as downloadable, run as download
from .._util import (
write_values_json,
Expand Down Expand Up @@ -159,9 +159,7 @@ def __init__(

self.values_to_json = values_to_json # pyre-ignore

workflow_nodes = [
node for node in (workflow.inputs or []) + workflow.body + (workflow.outputs or [])
]
workflow_nodes = (workflow.inputs or []) + workflow.body + (workflow.outputs or [])
workflow_nodes.append(WorkflowOutputs(workflow))

# TODO: by topsorting all section bodies we can ensure that when we schedule an additional
Expand Down Expand Up @@ -749,6 +747,10 @@ def run_local_workflow(

assert state.outputs is not None
write_values_json(state.outputs, os.path.join(run_dir, "outputs.json"), namespace=workflow.name)

from .. import values_to_json

make_output_links(values_to_json(state.outputs, namespace=workflow.name), run_dir)
logger.notice("done")
return (run_dir, state.outputs)

Expand Down
Loading

0 comments on commit 18e041a

Please sign in to comment.