diff --git a/WDL/runtime/config_templates/default.cfg b/WDL/runtime/config_templates/default.cfg index bb3b6cd7..40b7d8b0 100644 --- a/WDL/runtime/config_templates/default.cfg +++ b/WDL/runtime/config_templates/default.cfg @@ -92,6 +92,12 @@ output_hardlinks = false # output files would be deleted too. Input/output JSON, logs, and stdout/stderr are always retained # in the task run directory (above the container working directory). delete_work = false +# Create an output directory structure based on the positions of output files relative to the +# working directory for each task. For example, a task has the following output: +# File my_report = "reports/subdir/myreport.txt". +# By default this will be in out/my_report/my_report.txt. With use_relative_output_paths=true +# it will be in out/reports/subdir/myreport.txt +use_relative_output_paths = false # Suggest that each task's temporary directory should reside within the mounted task working # directory, instead of the storage backing the container's root filesystem. The latter (default) # is usually preferred because the working directory is more likely to reside on slower network- diff --git a/WDL/runtime/task.py b/WDL/runtime/task.py index c157b594..1b9cc7b1 100644 --- a/WDL/runtime/task.py +++ b/WDL/runtime/task.py @@ -125,7 +125,11 @@ def run_local_task( ) # create out/ and outputs.json _outputs = link_outputs( - cache, cached, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks") + cache, + cached, + run_dir, + hardlinks=cfg["file_io"].get_bool("output_hardlinks"), + use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"), ) write_values_json( cached, os.path.join(run_dir, "outputs.json"), namespace=task.name @@ -203,7 +207,11 @@ def run_local_task( # create output_links outputs = link_outputs( - cache, outputs, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks") + cache, + outputs, + run_dir, + hardlinks=cfg["file_io"].get_bool("output_hardlinks"), + use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"), ) # process outputs through plugins @@ -719,7 +727,11 @@ def raiser(exc: OSError): def link_outputs( - cache: CallCache, outputs: Env.Bindings[Value.Base], run_dir: str, hardlinks: bool = False + cache: CallCache, + outputs: Env.Bindings[Value.Base], + run_dir: str, + hardlinks: bool = False, + use_relative_output_paths: bool = False, ) -> Env.Bindings[Value.Base]: """ Following a successful run, the output files may be scattered throughout a complex directory @@ -727,6 +739,7 @@ def link_outputs( containing nicely organized symlinks to the output files, and rewrite File values in the outputs env to use these symlinks. """ + link_destinations: Dict[str, str] = dict() def map_paths(v: Value.Base, dn: str) -> Value.Base: if isinstance(v, (Value.File, Value.Directory)): @@ -738,11 +751,27 @@ def map_paths(v: Value.Base, dn: str) -> Value.Base: if target: target = os.path.realpath(target) assert os.path.exists(target) + if use_relative_output_paths: + # Look for the highest-level "work" directory and make sure + # any directories in there are present in the output structure + dir_parts = os.path.dirname(target).split(os.sep) + for i, part in enumerate(reversed(dir_parts)): + # Also look for "out" directory in case of hardlinks + if part == "work" or (part == "out" and hardlinks): + dn = os.path.join(dn, *dir_parts[len(dir_parts) - i :]) + break if not hardlinks and path_really_within(target, os.path.dirname(run_dir)): # make symlink relative target = os.path.relpath(target, start=os.path.realpath(dn)) link = os.path.join(dn, os.path.basename(v.value.rstrip("/"))) - os.makedirs(dn, exist_ok=False) + if link_destinations.get(link, target) != target: + raise FileExistsError( + f"Two files have the same link destination: " + f"{link_destinations[link]} and {target} are both " + f"written to {link}." + ) + link_destinations[link] = target + os.makedirs(dn, exist_ok=use_relative_output_paths) if hardlinks: # TODO: what if target is an input from a different filesystem? if isinstance(v, Value.Directory): @@ -793,11 +822,39 @@ def map_paths(v: Value.Base, dn: str) -> Value.Base: v.value[key] = map_paths(v.value[key], os.path.join(dn, key)) return v - os.makedirs(os.path.join(run_dir, "out"), exist_ok=False) + def map_paths_relative(v: Value.Base, dn: str) -> Value.Base: + if isinstance(v, (Value.File, Value.Directory)): + # Fall back on map paths to use all the correct linking code. + return map_paths(v, dn) + elif isinstance(v, Value.Array) and v.value: + for i in range(len(v.value)): + v.value[i] = map_paths_relative(v.value[i], dn) + elif isinstance(v, Value.Map): + for i, b in enumerate(v.value): + v.value[i] = (b[0], map_paths_relative(b[1], dn)) + elif isinstance(v, Value.Pair): + v.value = ( + map_paths_relative(v.value[0], dn), + map_paths_relative(v.value[1], dn), + ) + elif isinstance(v, Value.Struct): + for key in v.value: + v.value[key] = map_paths_relative(v.value[key], dn) + return v + + out_dir = os.path.join(run_dir, "out") + os.makedirs(out_dir, exist_ok=False) + + if use_relative_output_paths: + return outputs.map( + lambda binding: Env.Binding( + binding.name, map_paths_relative(copy.deepcopy(binding.value), out_dir) + ) + ) return outputs.map( lambda binding: Env.Binding( binding.name, - map_paths(copy.deepcopy(binding.value), os.path.join(run_dir, "out", binding.name)), + map_paths(copy.deepcopy(binding.value), os.path.join(out_dir, binding.name)), ) ) diff --git a/WDL/runtime/workflow.py b/WDL/runtime/workflow.py index 26568e3c..ac7ce6e2 100644 --- a/WDL/runtime/workflow.py +++ b/WDL/runtime/workflow.py @@ -780,7 +780,11 @@ def run_local_workflow( ) ) _outputs = link_outputs( - cache, cached, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks") + cache, + cached, + run_dir, + hardlinks=cfg["file_io"].get_bool("output_hardlinks"), + use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"), ) write_values_json( cached, os.path.join(run_dir, "outputs.json"), namespace=workflow.name @@ -932,7 +936,12 @@ def _workflow_main_loop( # create output_links outputs = link_outputs( - cache, state.outputs, run_dir, hardlinks=cfg["file_io"].get_bool("output_hardlinks") + cache, + state.outputs, + run_dir, + hardlinks=cfg["file_io"].get_bool("output_hardlinks"), + # Relative output paths only make sense at the top level, and hence is only used here. + use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"), ) # process outputs through plugins diff --git a/tests/runner.t b/tests/runner.t index 38be67de..89bde96c 100644 --- a/tests/runner.t +++ b/tests/runner.t @@ -11,7 +11,7 @@ source tests/bash-tap/bash-tap-bootstrap export PYTHONPATH="$SOURCE_DIR:$PYTHONPATH" miniwdl="python3 -m WDL" -plan tests 82 +plan tests 89 $miniwdl run_self_test is "$?" "0" "run_self_test" @@ -163,6 +163,81 @@ is "$?" "0" "copy_source scatter_echo.wdl" cmp -s echo_task.wdl scatterrun/wdl/echo_task.wdl is "$?" "0" "copy_source echo_task.wdl" +cat << 'EOF' > create_files.wdl +version 1.0 +task create_files { + command <<< + mkdir -p large_matryoshka/medium_matryoskha + touch large_matryoshka/medium_matryoskha/small_matryoshka + mkdir -p utensils + touch utensils/fork utensils/knife utensils/spoon + touch unboxed_item + mkdir -p turtle/turtle/turtle/turtle/turtle/turtle/ + touch turtle/turtle/turtle/turtle/turtle/turtle/turtle + >>> + output { + File small_matryoshka = "large_matryoshka/medium_matryoskha/small_matryoshka" + File fork = "utensils/fork" + File knife = "utensils/knife" + File spoon = "utensils/spoon" + File unboxed_item = "unboxed_item" + File all_the_way_down = "turtle/turtle/turtle/turtle/turtle/turtle/turtle" + # Below arrays will create collisions as these are the same paths as above, + # localized to the same links. This should be handled properly. + Array[File] utensils = [fork, knife, spoon] + Array[File] everything = [small_matryoshka, fork, knife, spoon, unboxed_item, all_the_way_down] + } +} +EOF + +cat << 'EOF' > correct_workflow.wdl +version 1.0 +import "create_files.wdl" as create_files + +workflow filecreator { + call create_files.create_files as createFiles {} + + output { + Array[File] utensils = createFiles.utensils + Array[File] all_stuff = createFiles.everything + } +} +EOF + +OUTPUT_DIR=use_relative_paths/_LAST/out/ +MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true $miniwdl run --dir use_relative_paths correct_workflow.wdl +is "$?" "0" relative_output_paths +test -L $OUTPUT_DIR/large_matryoshka/medium_matryoskha/small_matryoshka +is "$?" "0" "outputs are relative" +test -d $OUTPUT_DIR/turtle/turtle/turtle/turtle +is "$?" "0" "outputs are relative all the way down" + +MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true MINIWDL__FILE_IO__OUTPUT_HARDLINKS=true \ +$miniwdl run --dir use_relative_paths correct_workflow.wdl +is "$?" "0" relative_output_paths_with_hardlink +test -f $OUTPUT_DIR/large_matryoshka/medium_matryoskha/small_matryoshka +is "$?" "0" "outputs are relative using hardlinks" +test -d $OUTPUT_DIR/turtle/turtle/turtle/turtle +is "$?" "0" "outputs are relative all the way down using hardlinks" + +cat << 'EOF' > colliding_workflow.wdl +version 1.0 +import "create_files.wdl" as create_files + +workflow filecollider { + call create_files.create_files as createFiles {} + call create_files.create_files as createFiles2 {} + output { + Array[File] all_stuff1 = createFiles.everything + Array[File] all_stuff2 = createFiles2.everything + } +} +EOF + +MINIWDL__FILE_IO__USE_RELATIVE_OUTPUT_PATHS=true $miniwdl run --dir use_relative_paths colliding_workflow.wdl 2> errors.txt +grep -q "Two files have the same link destination" errors.txt +is "$?" "0" "use_relative_output_paths throws error on collisions" + cat << 'EOF' > failer2000.wdl version 1.0