Skip to content

Commit

Permalink
Issue/3960 scatter outputs should not clash (#3) (#3968)
Browse files Browse the repository at this point in the history
* Issue/3960 scatter outputs should not clash (#3)

* feat: adding unit test to detect defect

* feat: wip for scatter

* bugfix: detect duplicate targets for Files and write out files

* bugfix: rename test cases to be more descriptive

* feat: expand test and code cleanup

* feat: dont change tests that I dont need too

* feat: format and fix whitespace

* bugfix: fix lint issue

* feat: PR test fix

* feat: remove old comment
  • Loading branch information
kannon92 committed Dec 29, 2021
1 parent 5c6b540 commit 4286352
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 6 deletions.
54 changes: 48 additions & 6 deletions src/toil/cwl/cwltoil.py
Expand Up @@ -96,6 +96,8 @@
from schema_salad.avro.schema import Names
from schema_salad.exceptions import ValidationException
from schema_salad.sourceline import SourceLine
from schema_salad.ref_resolver import file_uri, uri_file_path


from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM
from toil.common import Config, Toil, addOptions
Expand Down Expand Up @@ -852,12 +854,42 @@ def visit(
# If we didn't download something that is a toilfile:
# reference, we just pass that along.

logger.debug(
"ToilPathMapper adding file mapping %s -> %s", deref, tgt
)
self._pathmap[path] = MapperEnt(
deref, tgt, "WritableFile" if copy else "File", staged
)
"""Link or copy files to their targets. Create them as needed."""
targets: Dict[str, str] = {}
for _, value in self._pathmap.items():
# If the target already exists in the pathmap, it means we have a conflict. But we didn't change tgt to reflect new name.
new_target = value.target.rpartition("_")[0]
if value.target == tgt: # Conflict detected in the pathmap
new_target = tgt
if new_target and new_target == tgt:
i = 2
new_tgt = f"{tgt}_{i}"
while new_tgt in targets:
i += 1
new_tgt = f"{tgt}_{i}"
targets[new_tgt] = new_tgt

for _, value_conflict in targets.items():
logger.debug(
"ToilPathMapper adding file mapping for conflict %s -> %s",
deref,
value_conflict,
)
self._pathmap[path] = MapperEnt(
deref,
value_conflict,
"WritableFile" if copy else "File",
staged,
)
# No conflicts detected so we can write out the original name.
if not targets:
logger.debug(
"ToilPathMapper adding file mapping %s -> %s", deref, tgt
)

self._pathmap[path] = MapperEnt(
deref, tgt, "WritableFile" if copy else "File", staged
)

# Handle all secondary files that need to be next to this one.
self.visitlisting(
Expand Down Expand Up @@ -1655,6 +1687,16 @@ def _collectDirEntries(
# This is all the CWL File and Directory objects we need to export.
jobfiles = list(_collectDirEntries(cwljob))

def _realpath(
ob: CWLObjectType,
) -> None:
location = cast(str, ob["location"])
if location.startswith("file:"):
ob["location"] = file_uri(os.path.realpath(uri_file_path(location)))
logger.debug("realpath %s" % ob["location"])

visit_class(jobfiles, ("File", "Directory"), _realpath)

# Now we need to save all the output files and directories.
# We shall use a ToilPathMapper.
pm = ToilPathMapper(
Expand Down
28 changes: 28 additions & 0 deletions src/toil/test/cwl/cwlTest.py
Expand Up @@ -699,6 +699,34 @@ def test_workflow_echo_string(self):
assert b'Finished toil run successfully' in stderr
assert p.returncode == 0


def test_workflow_echo_string_scatter_capture_stdout(self):
toil = 'toil-cwl-runner'
jobstore = f'--jobStore=file:explicit-local-jobstore-{uuid.uuid4()}'
option_1 = '--strict-memory-limit'
option_2 = '--force-docker-pull'
option_3 = '--clean=always'
cwl = os.path.join(os.path.dirname(__file__), 'echo_string_scatter_capture_stdout.cwl')
cmd = [toil, jobstore, option_1, option_2, option_3, cwl]
log.debug(f'Now running: {" ".join(cmd)}')
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
outputs = json.loads(stdout)
out_list = outputs["list_out"]
assert len(out_list) == 2, f"outList shoud have two file elements {out_list}"
out_base = outputs["list_out"][0]
# This is a test on the scatter functionality and stdout.
# Each value of scatter should generate a separate file in the output.
for index, file in enumerate(out_list):
if index > 0:
new_file_loc = out_base["location"] + f'_{index + 1}'
else:
new_file_loc = out_base["location"]
assert new_file_loc == file['location'], f"Toil should have detected conflicts for these stdout files {new_file_loc} and {file}"

assert b'Finished toil run successfully' in stderr
assert p.returncode == 0

def test_visit_top_cwl_class(self):
structure = {
'class': 'Directory',
Expand Down
57 changes: 57 additions & 0 deletions src/toil/test/cwl/echo_string_scatter_capture_stdout.cwl
@@ -0,0 +1,57 @@
cwlVersion: v1.2
class: Workflow

inputs:
- id: arrayS
type: string[]
default: ['hello','world']

steps:
hello:
run:
class: CommandLineTool
inputs:
s: string
baseCommand: [echo]
arguments: [ $(inputs.s)]
outputs:
out: stdout
err: stderr
in:
- id: s
source: arrayS
scatter:
- s
out:
- id: out
- id: err

list:
run:
class: CommandLineTool
inputs:
file: File
baseCommand: [ ls , -lh]
arguments: [ $(inputs.file) ]
stdout: "list.out"
outputs:
list_out: stdout
in:
- id: file
linkMerge: merge_flattened
source:
- hello/out
scatter:
- file
out:
- id: list_out

outputs:
- id: list_out
type: File[]
outputSource: ["list/list_out"]


requirements:
- class: ScatterFeatureRequirement
- class: MultipleInputFeatureRequirement

0 comments on commit 4286352

Please sign in to comment.