Skip to content

Commit

Permalink
Merge 64ca963 into 3f972ff
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Oct 17, 2021
2 parents 3f972ff + 64ca963 commit f116b13
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
5 changes: 5 additions & 0 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
# 0 = default to host `nproc`.
# -@
call_concurrency = 0
# call_concurrency applies to URI download tasks too, however, it may be desirable to limit their
# concurrency further still, since they're exceptionally I/O-intensive. In that case set
# download_concurrency to a nonzero value lower than the effective call_concurrency.
# (New in v1.3.1)
download_concurrency = 0
# container backend; docker_swarm (default), singularity, or as added by plug-ins
container_backend = docker_swarm
# When one task fails, immediately terminate all other running tasks. If disabled, stop launching
Expand Down
53 changes: 35 additions & 18 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,15 +918,39 @@ def _download_input_files(
transiently memoized.
"""

# scan for URIs and schedule their downloads on the thread pool
ops = {}
# scan inputs for URIs
uris = set()

def schedule_download(v: Union[Value.File, Value.Directory]) -> str:
nonlocal ops, uris
def scan_uri(v: Union[Value.File, Value.Directory]) -> str:
nonlocal uris
directory = isinstance(v, Value.Directory)
uri = v.value
if uri not in uris and downloadable(cfg, uri, directory=directory):
uris.add((uri, directory))
return uri

Value.rewrite_env_paths(inputs, scan_uri)
if not uris:
return
logger.notice(_("downloading input URIs", count=len(uris)))

# download them on the thread pool (but possibly further limiting concurrency)
download_concurrency = cfg.get_int("scheduler", "download_concurrency")
if download_concurrency <= 0:
download_concurrency = 999999
ops = {}
incomplete = len(uris)
outstanding = []
downloaded_bytes = 0
cached_hits = 0
exn = None

while incomplete and not exn:
assert len(outstanding) <= incomplete

# top up thread pool's queue (up to download_concurrency)
while uris and len(outstanding) < download_concurrency:
(uri, directory) = uris.pop()
logger.info(
_(f"schedule input {'directory' if directory else 'file'} download", uri=uri)
)
Expand All @@ -941,25 +965,16 @@ def schedule_download(v: Union[Value.File, Value.Directory]) -> str:
logger_prefix=logger_prefix + [f"download{len(ops)}"],
)
ops[future] = uri
uris.add(uri)
return uri

Value.rewrite_env_paths(inputs, schedule_download)
if not ops:
return
logger.notice(_("downloading input URIs", count=len(ops))) # pyre-fixme
outstanding.append(future)
assert outstanding

# collect the results, with "clean" fail-fast
outstanding = ops.keys()
downloaded_bytes = 0
cached_hits = 0
exn = None
while outstanding:
# wait for one or more oustanding downloads to finish
just_finished, still_outstanding = futures.wait(
outstanding, return_when=futures.FIRST_EXCEPTION
)
outstanding = still_outstanding
for future in just_finished:
# check results
try:
future_exn = future.exception()
except futures.CancelledError:
Expand All @@ -979,9 +994,11 @@ def schedule_download(v: Union[Value.File, Value.Directory]) -> str:
outsfut.cancel()
os.kill(os.getpid(), signal.SIGUSR1)
exn = future_exn
incomplete -= 1

if exn:
raise exn
logger.notice( # pyre-fixme
logger.notice(
_(
"processed input URIs",
cached=cached_hits,
Expand Down

0 comments on commit f116b13

Please sign in to comment.