Skip to content

Commit

Permalink
Merge 8c973e1 into ecf3e3a
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Dec 16, 2019
2 parents ecf3e3a + 8c973e1 commit c9c49c2
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 52 deletions.
2 changes: 1 addition & 1 deletion WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ def cromwell(
with open(os.path.join(rundir, "outputs.json"), "w") as outfile:
print(json.dumps(outputs_json["outputs"], indent=2), file=outfile)

runtime.make_output_links(outputs_json["outputs"], rundir)
runtime.link_outputs(outputs_json["outputs"], rundir)

sys.exit(proc.returncode)

Expand Down
2 changes: 1 addition & 1 deletion WDL/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from . import task
from . import workflow
from .error import *
from .task import run_local_task, make_output_links
from .task import run_local_task, link_outputs
from .workflow import run_local_workflow


Expand Down
86 changes: 50 additions & 36 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ def run_local_task(
# write and link outputs
from .. import values_to_json

make_output_links(values_to_json(outputs, namespace=task.name), run_dir) # pyre-fixme
outputs = link_outputs(outputs, run_dir)
write_values_json(outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name)

# make sure everything will be accessible to downstream tasks
Expand Down Expand Up @@ -930,50 +930,64 @@ def rewrite_files(v: Value.Base, output_name: str) -> None:
return outputs


def make_output_links(outputs_json: Dict[str, Any], run_dir: str) -> None:
def link_outputs(outputs: Env.Bindings[Value.Base], run_dir: str) -> Env.Bindings[Value.Base]:
"""
Following a successful run, the output files may be scattered throughout a complex directory
tree used for execution. To help navigating this, generate a subdirectory of the run directory
containing nicely organized symlinks to the output files.
Given ``WDL.Env.Bindings[WDL.Value.Base]`` outputs, this expects to receive
``WDL.values_to_json(outputs, namespace=targets.name)`` instead of outputs directly. This makes
it compatible with Cromwell's output JSON too.
For security reasons, omits any files not inside run_dir (e.g. if the outputs include an input
file located elsewhere)
containing nicely organized symlinks to the output files, and rewrite File values in the
outputs env to use these symlinks.
"""

def traverse(v: Any, dn: str) -> None: # pyre-fixme
assert isinstance(v, (str, int, float, list, dict)) or v is None
if (
isinstance(v, str)
and v.startswith(run_dir + "/")
and os.path.isfile(v)
and path_really_within(v, run_dir)
):
def map_files(v: Value.Base, dn: str) -> Value.Base:
if isinstance(v, Value.File):
hardlink = os.path.realpath(v.value)
assert os.path.isfile(hardlink)
symlink = os.path.join(dn, os.path.basename(v.value))
os.makedirs(dn, exist_ok=False)
os.symlink(v, os.path.join(dn, os.path.basename(v)))
elif isinstance(v, list) and v:
d = int(math.ceil(math.log10(len(v)))) # how many digits needed
for i, elt in enumerate(v):
traverse(elt, os.path.join(dn, str(i).rjust(d, "0")))
elif isinstance(v, dict):
os.symlink(hardlink, symlink)
v.value = symlink
# recurse into compound values
elif isinstance(v, Value.Array) and v.value:
d = int(math.ceil(math.log10(len(v.value)))) # how many digits needed
for i in range(len(v.value)):
v.value[i] = map_files(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
elif isinstance(v, Value.Map):
# create a subdirectory for each key, as long as the key names seem to make reasonable
# path components; otherwise, treat the dict as a list of its values (this is possible
# in Maps where keys can be arbitrary)
if (
sum(1 for key in v if re.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", key) is None)
# path components; otherwise, treat the dict as a list of its values
keys_ok = (
sum(
1
for b in v.value
if re.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0])) is None
)
== 0
):
for key, value in v.items():
traverse(value, os.path.join(dn, key))
else:
traverse(list(v.values()), dn)
)
d = int(math.ceil(math.log10(len(v.value))))
for i, b in enumerate(v.value):
v.value[i] = (
b[0],
map_files(
b[1], os.path.join(dn, str(b[0]) if keys_ok else str(i).rjust(d, "0"))
),
)
elif isinstance(v, Value.Pair):
v.value = (
map_files(v.value[0], os.path.join(dn, "left")),
map_files(v.value[1], os.path.join(dn, "right")),
)
elif isinstance(v, Value.Struct):
for key in v.value:
v.value[key] = map_files(v.value[key], os.path.join(dn, key))
return v

dn0 = os.path.join(run_dir, "output_links")
os.makedirs(dn0, exist_ok=False)
traverse(outputs_json, dn0)
return outputs.map(
lambda binding: Env.Binding(
binding.name,
map_files(
copy.deepcopy(binding.value), os.path.join(run_dir, "output_links", binding.name)
),
)
)


class _StdLib(StdLib.Base):
Expand Down
4 changes: 2 additions & 2 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import pkg_resources
from .. import Env, Type, Value, Tree, StdLib
from ..Error import InputError
from .task import run_local_task, _filenames, make_output_links
from .task import run_local_task, _filenames, link_outputs
from .download import able as downloadable, run as download
from .._util import (
write_values_json,
Expand Down Expand Up @@ -686,11 +686,11 @@ def run_local_workflow(
for tp in thread_pools:
tp.shutdown()

outputs = link_outputs(outputs, run_dir)
write_values_json(outputs, os.path.join(run_dir, "outputs.json"), namespace=workflow.name)

from .. import values_to_json

make_output_links(values_to_json(outputs, namespace=workflow.name), run_dir)
logger.notice("done") # pyre-fixme
return (run_dir, outputs)

Expand Down
8 changes: 4 additions & 4 deletions tests/runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ is "$(ls $f1)" "$f1" "task product brown file"
f1=$(jq -r '.["echo.out_f"][2]' taskrun/outputs.json)
is "$(basename $f1)" "fox" "task product fox"
is "$(ls $f1)" "$f1" "task product fox file"
is "$(ls taskrun/output_links/echo.out_f/2)" "fox" "task product fox link"
is "$(ls taskrun/output_links/out_f/2)" "fox" "task product fox link"

cat << 'EOF' > sleep.wdl
version 1.0
Expand Down Expand Up @@ -128,7 +128,7 @@ is "$(ls $f1)" "$f1" "workflow product brown file"
f1=$(jq -r '.["echo.t.out_f"][2]' workflowrun/outputs.json)
is "$(basename $f1)" "fox" "workflow product fox"
is "$(ls $f1)" "$f1" "workflow product fox file"
is "$(ls workflowrun/output_links/echo.t.out_f/2)" "fox" "workflow product fox link"
is "$(ls workflowrun/output_links/t.out_f/2)" "fox" "workflow product fox link"
is "$(cat workflowrun/rerun)" "pushd $DN && miniwdl run --dir workflowrun/. echo.wdl t.s=foo t.f=quick t.a_s=bar t.a_f=brown --empty a_s; popd"

cat << 'EOF' > scatter_echo.wdl
Expand All @@ -149,8 +149,8 @@ workflow echo {
EOF
$miniwdl run --dir scatterrun/. scatter_echo.wdl n=2 t.s=foo t.f=quick t.a_s=bar t.a_f=brown | tee stdout
is "$?" "0" "scatter run"
is "$(ls scatterrun/output_links/echo.t.out_f/0/2)" "fox" "scatter product 0 fox link"
is "$(ls scatterrun/output_links/echo.t.out_f/1/2)" "fox" "scatter product 1 fox link"
is "$(ls scatterrun/output_links/t.out_f/0/2)" "fox" "scatter product 0 fox link"
is "$(ls scatterrun/output_links/t.out_f/1/2)" "fox" "scatter product 1 fox link"
is "$(find scatterrun/ | xargs -n 1 stat -c %U | sort | uniq)" "$(whoami)" "scatter files all owned by $(whoami)"

cat << 'EOF' > failer2000.wdl
Expand Down
2 changes: 1 addition & 1 deletion tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_hello_file(self):
}
""",
{"who": os.path.join(self._dir, "alyssa.txt")})
self.assertEqual(outputs["who2"], os.path.join(self._dir, "alyssa.txt"))
self.assertEqual(os.path.realpath(outputs["who2"]), os.path.join(self._dir, "alyssa.txt"))

# stdout()
outputs = self._test_task(R"""
Expand Down
14 changes: 7 additions & 7 deletions tests/test_5stdlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,19 +358,19 @@ def test_glob(self):
}
""")
self.assertEqual(len(outputs["globs"][0]), 1)
self.assertTrue(outputs["globs"][0][0].endswith("/stuff/foo"))
self.assertTrue(outputs["globs"][0][0].endswith("/foo"))
self.assertEqual(len(outputs["globs"][1]), 4)
self.assertTrue(outputs["globs"][1][0].endswith("/stuff/bar"))
self.assertTrue(outputs["globs"][1][1].endswith("/stuff/bas"))
self.assertTrue(outputs["globs"][1][2].endswith("/stuff/bat"))
self.assertTrue(outputs["globs"][1][3].endswith("/stuff/baz"))
self.assertTrue(outputs["globs"][1][0].endswith("/bar"))
self.assertTrue(outputs["globs"][1][1].endswith("/bas"))
self.assertTrue(outputs["globs"][1][2].endswith("/bat"))
self.assertTrue(outputs["globs"][1][3].endswith("/baz"))
self.assertEqual(len(outputs["globs"][2]), 5)
self.assertTrue(outputs["globs"][2][4].endswith("/stuff/foo"))
self.assertTrue(outputs["globs"][2][4].endswith("/foo"))
self.assertEqual(len(outputs["globs"][3]), 0)
for g in outputs["globs"] + [[outputs["f1"]]]:
for fn in g:
assert os.path.isfile(fn), fn
self.assertTrue(outputs["f1"].endswith("/stuff/foo"))
self.assertTrue(outputs["f1"].endswith("/foo"))

self._test_task(R"""
version 1.0
Expand Down

0 comments on commit c9c49c2

Please sign in to comment.