diff --git a/WDL/runtime/config_templates/default.cfg b/WDL/runtime/config_templates/default.cfg index c4e7de44..d7ea3d00 100644 --- a/WDL/runtime/config_templates/default.cfg +++ b/WDL/runtime/config_templates/default.cfg @@ -15,6 +15,11 @@ # 0 = default to host `nproc`. # -@ call_concurrency = 0 +# call_concurrency applies to URI download tasks too, however, it may be desirable to limit their +# concurrency further still, since they're exceptionally I/O-intensive. In that case set +# download_concurrency to a nonzero value lower than the effective call_concurrency. +# (New in v1.3.1) +download_concurrency = 0 # container backend; docker_swarm (default), singularity, or as added by plug-ins container_backend = docker_swarm # When one task fails, immediately terminate all other running tasks. If disabled, stop launching diff --git a/WDL/runtime/workflow.py b/WDL/runtime/workflow.py index 5451951d..84382cea 100644 --- a/WDL/runtime/workflow.py +++ b/WDL/runtime/workflow.py @@ -918,15 +918,39 @@ def _download_input_files( transiently memoized. """ - # scan for URIs and schedule their downloads on the thread pool - ops = {} + # scan inputs for URIs uris = set() - def schedule_download(v: Union[Value.File, Value.Directory]) -> str: - nonlocal ops, uris + def scan_uri(v: Union[Value.File, Value.Directory]) -> str: + nonlocal uris directory = isinstance(v, Value.Directory) uri = v.value if uri not in uris and downloadable(cfg, uri, directory=directory): + uris.add((uri, directory)) + return uri + + Value.rewrite_env_paths(inputs, scan_uri) + if not uris: + return + logger.notice(_("downloading input URIs", count=len(uris))) + + # download them on the thread pool (but possibly further limiting concurrency) + download_concurrency = cfg.get_int("scheduler", "download_concurrency") + if download_concurrency <= 0: + download_concurrency = 999999 + ops = {} + incomplete = len(uris) + outstanding = [] + downloaded_bytes = 0 + cached_hits = 0 + exn = None + + while incomplete and not exn: + assert len(outstanding) <= incomplete + + # top up thread pool's queue (up to download_concurrency) + while uris and len(outstanding) < download_concurrency: + (uri, directory) = uris.pop() logger.info( _(f"schedule input {'directory' if directory else 'file'} download", uri=uri) ) @@ -941,25 +965,16 @@ def schedule_download(v: Union[Value.File, Value.Directory]) -> str: logger_prefix=logger_prefix + [f"download{len(ops)}"], ) ops[future] = uri - uris.add(uri) - return uri - - Value.rewrite_env_paths(inputs, schedule_download) - if not ops: - return - logger.notice(_("downloading input URIs", count=len(ops))) # pyre-fixme + outstanding.append(future) + assert outstanding - # collect the results, with "clean" fail-fast - outstanding = ops.keys() - downloaded_bytes = 0 - cached_hits = 0 - exn = None - while outstanding: + # wait for one or more oustanding downloads to finish just_finished, still_outstanding = futures.wait( outstanding, return_when=futures.FIRST_EXCEPTION ) outstanding = still_outstanding for future in just_finished: + # check results try: future_exn = future.exception() except futures.CancelledError: @@ -979,9 +994,11 @@ def schedule_download(v: Union[Value.File, Value.Directory]) -> str: outsfut.cancel() os.kill(os.getpid(), signal.SIGUSR1) exn = future_exn + incomplete -= 1 + if exn: raise exn - logger.notice( # pyre-fixme + logger.notice( _( "processed input URIs", cached=cached_hits,