Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use_relative_output_paths #608

Merged
merged 21 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ def run_self_test(**kwargs):
}
output {
Array[String] messages = select_all(msg)
Array[File] message_files = select_all(hello.message)
}
}
task hello {
Expand All @@ -1383,11 +1384,13 @@ def run_self_test(**kwargs):
}
command {
if grep -qv ^\# "${who}" ; then
echo "Hello, $(cat ${who})!" | tee message.txt 1>&2
name="$(cat ${who})"
mkdir messages
echo "Hello, $name!" | tee "messages/$name.txt" 1>&2
fi
}
output {
File? message = "message.txt"
File? message = select_first(flatten([glob("messages/*.txt"), ["nonexistent"]]))
}
runtime {
docker: "ubuntu:18.04"
Expand Down
10 changes: 10 additions & 0 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ copy_input_files_for = []
# with hardlinks instead of symlinks. Beware the potential confusion arising from files with
# multiple hardlinks! See also delete_work, below.
output_hardlinks = false
# The out/ links have a default directory structure reflecting the WDL output and field names.
# Setting use_relative_output_paths switches to an alternate structure reflecting paths within the
# task working directory, which can be more convenient in some settings (e.g. keeping tabix index
# adjacent to the data file). However, unlike the default structure, it will error out if there's a
# filename collision between outputs of different tasks.
# For example, a task has the following output:
# File my_report = "reports/subdir/myreport.txt".
# The default output link will be out/my_report/my_report.txt. With use_relative_output_paths=true,
# it will be out/reports/subdir/myreport.txt.
use_relative_output_paths = false
# Delete task working directory upon completion. The task container's working directory is a
# bind-mounted host directory, so files written into it are left behind after the container is torn
# down. If tasks write large non-output files into their working directory (instead of $TMPDIR as
Expand Down
101 changes: 90 additions & 11 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ def run_local_task(
)
# create out/ and outputs.json
_outputs = link_outputs(
cache, cached, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
cached,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)
write_values_json(
cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
Expand Down Expand Up @@ -203,7 +207,11 @@ def run_local_task(

# create output_links
outputs = link_outputs(
cache, outputs, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
outputs,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)

# process outputs through plugins
Expand Down Expand Up @@ -719,7 +727,11 @@ def raiser(exc: OSError):


def link_outputs(
cache: CallCache, outputs: Env.Bindings[Value.Base], run_dir: str, hardlinks: bool = False
cache: CallCache,
outputs: Env.Bindings[Value.Base],
run_dir: str,
hardlinks: bool = False,
use_relative_output_paths: bool = False,
) -> Env.Bindings[Value.Base]:
"""
Following a successful run, the output files may be scattered throughout a complex directory
Expand All @@ -728,6 +740,16 @@ def link_outputs(
outputs env to use these symlinks.
"""

def link1(target: str, link: str, directory: bool) -> None:
if hardlinks:
# TODO: what if target is an input from a different filesystem?
if directory:
shutil.copytree(target, link, symlinks=True, copy_function=link_force)
else:
link_force(target, link)
else:
symlink_force(target, link)

def map_paths(v: Value.Base, dn: str) -> Value.Base:
if isinstance(v, (Value.File, Value.Directory)):
target = (
Expand All @@ -743,14 +765,7 @@ def map_paths(v: Value.Base, dn: str) -> Value.Base:
target = os.path.relpath(target, start=os.path.realpath(dn))
link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
os.makedirs(dn, exist_ok=False)
if hardlinks:
# TODO: what if target is an input from a different filesystem?
if isinstance(v, Value.Directory):
shutil.copytree(target, link, symlinks=True, copy_function=link_force)
else:
link_force(target, link)
else:
symlink_force(target, link)
link1(target, link, isinstance(v, Value.Directory))
# Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
# directory without reference to the output types or JSON for whatever reason. It
# might otherwise have trouble distinguishing Directory outputs among the
Expand Down Expand Up @@ -794,6 +809,10 @@ def map_paths(v: Value.Base, dn: str) -> Value.Base:
return v

os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)

if use_relative_output_paths:
return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)

return outputs.map(
lambda binding: Env.Binding(
binding.name,
Expand All @@ -802,6 +821,66 @@ def map_paths(v: Value.Base, dn: str) -> Value.Base:
)


def link_outputs_relative(
link1: Callable[[str, str, bool], None],
cache: CallCache,
outputs: Env.Bindings[Value.Base],
run_dir: str,
hardlinks: bool = False,
) -> Env.Bindings[Value.Base]:
"""
link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
the generated files' paths relative to their task working directory.
"""
link_destinations = dict()

def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
target = (
v.value
if os.path.exists(v.value)
else cache.get_download(v.value, isinstance(v, Value.Directory))
)
if target:
real_target = os.path.realpath(target)
rel_link = None
if path_really_within(target, os.path.join(run_dir, "work")):
# target was generated by current task; use its path relative to the task work dir
if not os.path.basename(run_dir).startswith("download-"): # except download tasks
rel_link = os.path.relpath(real_target, os.path.join(run_dir, "work"))
else:
# target is an out/ link generated by a call in the current workflow OR a cached
# run; use the link's path relative to that out/ dir, which by induction should
# equal its path relative to the original work/ dir.
# we need heuristic to find the out/ dir in a task/workflow run directory, since the
# user's cwd or the task-generated relative path might coincidentally have
# something named 'out'.
p = None
for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
if p and (
os.path.isfile(os.path.join(target[:p], "task.log"))
or os.path.isfile(os.path.join(target[:p], "workflow.log"))
):
break
p = None
if p and p + 5 < len(target):
rel_link = os.path.relpath(target, target[: p + 5])
# if neither of the above cases applies, then fall back to just the target basename
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A case is missing for call cached entries. These also have an outdir. I wonder if it is possible to "know" that the code is taking a path from the cache.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhpvorderman Good catch, I'll look into this

rel_link = rel_link or os.path.basename(target)
abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
if link_destinations.get(abs_link, real_target) != real_target:
raise FileExistsError(
"Output filename collision; to allow this, set"
" [file_io] use_relative_output_paths = false. Affected path: " + abs_link
)
os.makedirs(os.path.dirname(abs_link), exist_ok=True)
link1(real_target, abs_link, isinstance(v, Value.Directory))
link_destinations[abs_link] = real_target
return abs_link
return v.value

return Value.rewrite_env_paths(outputs, map_path_relative)


def _delete_work(
cfg: config.Loader,
logger: logging.Logger,
Expand Down
13 changes: 11 additions & 2 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,11 @@ def run_local_workflow(
)
)
_outputs = link_outputs(
cache, cached, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
cached,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)
write_values_json(
cached, os.path.join(run_dir, "outputs.json"), namespace=workflow.name
Expand Down Expand Up @@ -932,7 +936,12 @@ def _workflow_main_loop(

# create output_links
outputs = link_outputs(
cache, state.outputs, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
state.outputs,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
# Relative output paths only make sense at the top level, and hence is only used here.
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)

# process outputs through plugins
Expand Down
9 changes: 8 additions & 1 deletion stubs/regex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Iterator, Tuple

POSIX: int
UNICODE: int
Expand All @@ -16,3 +16,10 @@ def compile(pattern, flags=0, **kwargs) -> Pattern:

def fullmatch(pat: str, string: str) -> Any:
...

class Match:
def span(self) -> Tuple[int, int]:
...

def finditer(pattern: str, string:str) -> Iterator[Match]:
...
77 changes: 76 additions & 1 deletion tests/runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ source tests/bash-tap/bash-tap-bootstrap
export PYTHONPATH="$SOURCE_DIR:$PYTHONPATH"
miniwdl="python3 -m WDL"

plan tests 82
plan tests 89

$miniwdl run_self_test
is "$?" "0" "run_self_test"
Expand Down Expand Up @@ -163,6 +163,81 @@ is "$?" "0" "copy_source scatter_echo.wdl"
cmp -s echo_task.wdl scatterrun/wdl/echo_task.wdl
is "$?" "0" "copy_source echo_task.wdl"

cat << 'EOF' > create_files.wdl
version 1.0
task create_files {
command <<<
mkdir -p large_matryoshka/medium_matryoskha
touch large_matryoshka/medium_matryoskha/small_matryoshka
mkdir -p utensils
touch utensils/fork utensils/knife utensils/spoon
touch unboxed_item
mkdir -p turtle/turtle/turtle/turtle/turtle/turtle/
touch turtle/turtle/turtle/turtle/turtle/turtle/turtle
>>>
output {
File small_matryoshka = "large_matryoshka/medium_matryoskha/small_matryoshka"
File fork = "utensils/fork"
File knife = "utensils/knife"
File spoon = "utensils/spoon"
File unboxed_item = "unboxed_item"
File all_the_way_down = "turtle/turtle/turtle/turtle/turtle/turtle/turtle"
# Below arrays will create collisions as these are the same paths as above,
# localized to the same links. This should be handled properly.
Array[File] utensils = [fork, knife, spoon]
Array[File] everything = [small_matryoshka, fork, knife, spoon, unboxed_item, all_the_way_down]
}
}
EOF

cat << 'EOF' > correct_workflow.wdl
version 1.0
import "create_files.wdl" as create_files

workflow filecreator {
call create_files.create_files as createFiles {}

output {
Array[File] utensils = createFiles.utensils
Array[File] all_stuff = createFiles.everything
}
}
EOF

OUTPUT_DIR=use_relative_paths/_LAST/out/
MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true $miniwdl run --dir use_relative_paths correct_workflow.wdl
is "$?" "0" relative_output_paths
test -L $OUTPUT_DIR/large_matryoshka/medium_matryoskha/small_matryoshka
is "$?" "0" "outputs are relative"
test -d $OUTPUT_DIR/turtle/turtle/turtle/turtle
is "$?" "0" "outputs are relative all the way down"

MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true MINIWDL__FILE_IO__OUTPUT_HARDLINKS=true \
$miniwdl run --dir use_relative_paths correct_workflow.wdl
is "$?" "0" relative_output_paths_with_hardlink
test -f $OUTPUT_DIR/large_matryoshka/medium_matryoskha/small_matryoshka
is "$?" "0" "outputs are relative using hardlinks"
test -d $OUTPUT_DIR/turtle/turtle/turtle/turtle
is "$?" "0" "outputs are relative all the way down using hardlinks"

cat << 'EOF' > colliding_workflow.wdl
version 1.0
import "create_files.wdl" as create_files

workflow filecollider {
call create_files.create_files as createFiles {}
call create_files.create_files as createFiles2 {}
output {
Array[File] all_stuff1 = createFiles.everything
Array[File] all_stuff2 = createFiles2.everything
}
}
EOF

MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true $miniwdl run --dir use_relative_paths colliding_workflow.wdl 2> errors.txt
grep -q "Output filename collision" errors.txt
is "$?" "0" "use_relative_output_paths throws error on collisions"

cat << 'EOF' > failer2000.wdl
version 1.0

Expand Down
47 changes: 47 additions & 0 deletions tests/test_7runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,3 +1225,50 @@ def test_network_host(self):
{"docker_swarm": {"allow_networks": ["host"]}}
)
self._run(wdl, {}, cfg=cfg)


class TestRelativeOutputPaths(RunnerTestCase):
"""
More tests for this feature are in runner.t. This one is for basic coverage.
"""
wdl = """
version development
workflow w {
input {
Array[String] names
}
scatter (name in names) {
call t {
input: name
}
}
output {
Array[File] messages = t.message
}
}
task t {
input {
String name
}
command <<<
mkdir out
echo "Hello, ~{name}]" > 'out/~{name}.txt'
>>>
output {
File message = "out/~{name}.txt"
}
}
"""

def test_ok(self):
cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), [])
cfg.override({"file_io": {"use_relative_output_paths": True}})
outp = self._run(self.wdl, {"names": ["Alyssa", "Ben"]}, cfg=cfg)
self.assertTrue(outp["messages"][0].endswith("/out/out/Alyssa.txt"))
self.assertTrue(outp["messages"][1].endswith("/out/out/Ben.txt"))

def test_collision(self):
cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), [])
cfg.override({"file_io": {"use_relative_output_paths": True}})
with self.assertRaises(WDL.runtime.error.RunFailed):
self._run(self.wdl, {"names": ["Ben", "Ben"]}, cfg=cfg)