Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a use_relative_output_paths parameter that flattens output directories. #606

Closed
wants to merge 13 commits into from
Closed
6 changes: 6 additions & 0 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ output_hardlinks = false
# output files would be deleted too. Input/output JSON, logs, and stdout/stderr are always retained
# in the task run directory (above the container working directory).
delete_work = false
# Create an output directory structure based on the positions of output files relative to the
# working directory for each task. For example, a task has the following output:
# File my_report = "reports/subdir/myreport.txt".
# By default this will be in out/my_report/my_report.txt. With use_relative_output_paths=true
# it will be in out/reports/subdir/myreport.txt
use_relative_output_paths = false
# Suggest that each task's temporary directory should reside within the mounted task working
# directory, instead of the storage backing the container's root filesystem. The latter (default)
# is usually preferred because the working directory is more likely to reside on slower network-
Expand Down
69 changes: 63 additions & 6 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ def run_local_task(
)
# create out/ and outputs.json
_outputs = link_outputs(
cache, cached, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
cached,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)
write_values_json(
cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
Expand Down Expand Up @@ -203,7 +207,11 @@ def run_local_task(

# create output_links
outputs = link_outputs(
cache, outputs, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
outputs,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)

# process outputs through plugins
Expand Down Expand Up @@ -719,14 +727,19 @@ def raiser(exc: OSError):


def link_outputs(
cache: CallCache, outputs: Env.Bindings[Value.Base], run_dir: str, hardlinks: bool = False
cache: CallCache,
outputs: Env.Bindings[Value.Base],
run_dir: str,
hardlinks: bool = False,
use_relative_output_paths: bool = False,
) -> Env.Bindings[Value.Base]:
"""
Following a successful run, the output files may be scattered throughout a complex directory
tree used for execution. To help navigating this, generate a subdirectory of the run directory
containing nicely organized symlinks to the output files, and rewrite File values in the
outputs env to use these symlinks.
"""
link_destinations: Dict[str, str] = dict()

def map_paths(v: Value.Base, dn: str) -> Value.Base:
if isinstance(v, (Value.File, Value.Directory)):
Expand All @@ -738,11 +751,27 @@ def map_paths(v: Value.Base, dn: str) -> Value.Base:
if target:
target = os.path.realpath(target)
assert os.path.exists(target)
if use_relative_output_paths:
# Look for the highest-level "work" directory and make sure
# any directories in there are present in the output structure
dir_parts = os.path.dirname(target).split(os.sep)
for i, part in enumerate(reversed(dir_parts)):
# Also look for "out" directory in case of hardlinks
if part == "work" or (part == "out" and hardlinks):
dn = os.path.join(dn, *dir_parts[len(dir_parts) - i :])
break
if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
# make symlink relative
target = os.path.relpath(target, start=os.path.realpath(dn))
link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
os.makedirs(dn, exist_ok=False)
if link_destinations.get(link, target) != target:
raise FileExistsError(
f"Two files have the same link destination: "
f"{link_destinations[link]} and {target} are both "
f"written to {link}."
)
link_destinations[link] = target
os.makedirs(dn, exist_ok=use_relative_output_paths)
if hardlinks:
# TODO: what if target is an input from a different filesystem?
if isinstance(v, Value.Directory):
Expand Down Expand Up @@ -793,11 +822,39 @@ def map_paths(v: Value.Base, dn: str) -> Value.Base:
v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
return v

os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
def map_paths_relative(v: Value.Base, dn: str) -> Value.Base:
if isinstance(v, (Value.File, Value.Directory)):
# Fall back on map paths to use all the correct linking code.
return map_paths(v, dn)
elif isinstance(v, Value.Array) and v.value:
for i in range(len(v.value)):
v.value[i] = map_paths_relative(v.value[i], dn)
elif isinstance(v, Value.Map):
for i, b in enumerate(v.value):
v.value[i] = (b[0], map_paths_relative(b[1], dn))
elif isinstance(v, Value.Pair):
v.value = (
map_paths_relative(v.value[0], dn),
map_paths_relative(v.value[1], dn),
)
elif isinstance(v, Value.Struct):
for key in v.value:
v.value[key] = map_paths_relative(v.value[key], dn)
return v

out_dir = os.path.join(run_dir, "out")
os.makedirs(out_dir, exist_ok=False)

if use_relative_output_paths:
return outputs.map(
lambda binding: Env.Binding(
binding.name, map_paths_relative(copy.deepcopy(binding.value), out_dir)
)
)
return outputs.map(
lambda binding: Env.Binding(
binding.name,
map_paths(copy.deepcopy(binding.value), os.path.join(run_dir, "out", binding.name)),
map_paths(copy.deepcopy(binding.value), os.path.join(out_dir, binding.name)),
)
)

Expand Down
13 changes: 11 additions & 2 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,11 @@ def run_local_workflow(
)
)
_outputs = link_outputs(
cache, cached, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
cached,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)
write_values_json(
cached, os.path.join(run_dir, "outputs.json"), namespace=workflow.name
Expand Down Expand Up @@ -932,7 +936,12 @@ def _workflow_main_loop(

# create output_links
outputs = link_outputs(
cache, state.outputs, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks")
cache,
state.outputs,
run_dir,
hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
# Relative output paths only make sense at the top level, and hence is only used here.
use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
)

# process outputs through plugins
Expand Down
77 changes: 76 additions & 1 deletion tests/runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ source tests/bash-tap/bash-tap-bootstrap
export PYTHONPATH="$SOURCE_DIR:$PYTHONPATH"
miniwdl="python3 -m WDL"

plan tests 82
plan tests 89

$miniwdl run_self_test
is "$?" "0" "run_self_test"
Expand Down Expand Up @@ -163,6 +163,81 @@ is "$?" "0" "copy_source scatter_echo.wdl"
cmp -s echo_task.wdl scatterrun/wdl/echo_task.wdl
is "$?" "0" "copy_source echo_task.wdl"

cat << 'EOF' > create_files.wdl
version 1.0
task create_files {
command <<<
mkdir -p large_matryoshka/medium_matryoskha
touch large_matryoshka/medium_matryoskha/small_matryoshka
mkdir -p utensils
touch utensils/fork utensils/knife utensils/spoon
touch unboxed_item
mkdir -p turtle/turtle/turtle/turtle/turtle/turtle/
touch turtle/turtle/turtle/turtle/turtle/turtle/turtle
>>>
output {
File small_matryoshka = "large_matryoshka/medium_matryoskha/small_matryoshka"
File fork = "utensils/fork"
File knife = "utensils/knife"
File spoon = "utensils/spoon"
File unboxed_item = "unboxed_item"
File all_the_way_down = "turtle/turtle/turtle/turtle/turtle/turtle/turtle"
# Below arrays will create collisions as these are the same paths as above,
# localized to the same links. This should be handled properly.
Array[File] utensils = [fork, knife, spoon]
Array[File] everything = [small_matryoshka, fork, knife, spoon, unboxed_item, all_the_way_down]
}
}
EOF

cat << 'EOF' > correct_workflow.wdl
version 1.0
import "create_files.wdl" as create_files

workflow filecreator {
call create_files.create_files as createFiles {}

output {
Array[File] utensils = createFiles.utensils
Array[File] all_stuff = createFiles.everything
}
}
EOF

OUTPUT_DIR=use_relative_paths/_LAST/out/
MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true $miniwdl run --dir use_relative_paths correct_workflow.wdl
is "$?" "0" relative_output_paths
test -L $OUTPUT_DIR/large_matryoshka/medium_matryoskha/small_matryoshka
is "$?" "0" "outputs are relative"
test -d $OUTPUT_DIR/turtle/turtle/turtle/turtle
is "$?" "0" "outputs are relative all the way down"

MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true MINIWDL__FILE_IO__OUTPUT_HARDLINKS=true \
$miniwdl run --dir use_relative_paths correct_workflow.wdl
is "$?" "0" relative_output_paths_with_hardlink
test -f $OUTPUT_DIR/large_matryoshka/medium_matryoskha/small_matryoshka
is "$?" "0" "outputs are relative using hardlinks"
test -d $OUTPUT_DIR/turtle/turtle/turtle/turtle
is "$?" "0" "outputs are relative all the way down using hardlinks"

cat << 'EOF' > colliding_workflow.wdl
version 1.0
import "create_files.wdl" as create_files

workflow filecollider {
call create_files.create_files as createFiles {}
call create_files.create_files as createFiles2 {}
output {
Array[File] all_stuff1 = createFiles.everything
Array[File] all_stuff2 = createFiles2.everything
}
}
EOF

MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true $miniwdl run --dir use_relative_paths colliding_workflow.wdl 2> errors.txt
grep -q "Two files have the same link destination" errors.txt
is "$?" "0" "use_relative_output_paths throws error on collisions"

cat << 'EOF' > failer2000.wdl
version 1.0

Expand Down