Skip to content

Commit

Permalink
Merge 77348c3 into 7f9fadb
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Nov 28, 2019
2 parents 7f9fadb + 77348c3 commit 66f72ee
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
39 changes: 27 additions & 12 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,20 +816,35 @@ def schedule_downloads(v: Value.Base) -> None:
return inputs
logger.notice(_("downloading input files", count=len(ops))) # pyre-fixme

# collect the results
# collect the results, with "clean" fail-fast
outstanding = ops.keys()
downloaded = {}
total_bytes = 0
try:
for future in futures.as_completed(ops):
uri = ops[future]
downloaded[uri] = future.result()
sz = os.path.getsize(downloaded[uri])
logger.info(_("downloaded input file", uri=uri, file=downloaded[uri], bytes=sz))
total_bytes += sz
except:
for future in ops:
future.cancel()
raise
exn = None
while outstanding:
just_finished, still_outstanding = futures.wait(
outstanding, return_when=futures.FIRST_EXCEPTION
)
outstanding = still_outstanding
for future in just_finished:
try:
future_exn = future.exception()
except futures.CancelledError:
future_exn = Terminated()
if not future_exn:
uri = ops[future]
downloaded[uri] = future.result()
sz = os.path.getsize(downloaded[uri])
logger.info(_("downloaded input file", uri=uri, file=downloaded[uri], bytes=sz))
total_bytes += sz
elif not exn:
# cancel pending ops and signal running ones to abort
for outsfut in outstanding:
outsfut.cancel()
os.kill(os.getpid(), signal.SIGUSR1)
exn = future_exn
if exn:
raise exn
logger.notice( # pyre-fixme
_("downloaded input files", count=len(downloaded), total_bytes=total_bytes)
)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_6workflowrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,5 +993,5 @@ def test_download_input_files(self):
}
"""
self._test_workflow(count, {"files": ["https://google.com/robots.txt", "https://raw.githubusercontent.com/chanzuckerberg/miniwdl/master/tests/alyssa_ben.txt"]})
self._test_workflow(count, {"files": ["https://google.com/robots.txt", "https://raw.githubusercontent.com/chanzuckerberg/miniwdl/master/nonexistent12345.txt"]},
self._test_workflow(count, {"files": ["https://google.com/robots.txt", "https://raw.githubusercontent.com/chanzuckerberg/miniwdl/master/nonexistent12345.txt", "https://raw.githubusercontent.com/chanzuckerberg/miniwdl/master/tests/alyssa_ben.txt"]},
expected_exception=WDL.runtime.DownloadFailed)

0 comments on commit 66f72ee

Please sign in to comment.