From 95f4e44047cf1ffd8dbbb6979f3507618fb4f316 Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Sat, 16 Oct 2021 03:07:30 -1000 Subject: [PATCH 1/2] add [scheduler] download_concurrency to limit number of concurrent downloads specifically --- WDL/runtime/config_templates/default.cfg | 4 ++ WDL/runtime/workflow.py | 53 ++++++++++++++++-------- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/WDL/runtime/config_templates/default.cfg b/WDL/runtime/config_templates/default.cfg index 108366f3..e6e4e79c 100644 --- a/WDL/runtime/config_templates/default.cfg +++ b/WDL/runtime/config_templates/default.cfg @@ -15,6 +15,10 @@ # 0 = default to host `nproc`. # -@ call_concurrency = 0 +# It may be particularly counterproductive to run too many downloader tasks concurrently, since +# they're so I/O-intensive. Therefore download_concurrency can be set lower than call_concurrency +# to limit the number of concurrent download tasks specifically. +download_concurrency = 0 # container backend; docker_swarm (default), singularity, or as added by plug-ins container_backend = docker_swarm # When one task fails, immediately terminate all other running tasks. If disabled, stop launching diff --git a/WDL/runtime/workflow.py b/WDL/runtime/workflow.py index 5451951d..84382cea 100644 --- a/WDL/runtime/workflow.py +++ b/WDL/runtime/workflow.py @@ -918,15 +918,39 @@ def _download_input_files( transiently memoized. """ - # scan for URIs and schedule their downloads on the thread pool - ops = {} + # scan inputs for URIs uris = set() - def schedule_download(v: Union[Value.File, Value.Directory]) -> str: - nonlocal ops, uris + def scan_uri(v: Union[Value.File, Value.Directory]) -> str: + nonlocal uris directory = isinstance(v, Value.Directory) uri = v.value if uri not in uris and downloadable(cfg, uri, directory=directory): + uris.add((uri, directory)) + return uri + + Value.rewrite_env_paths(inputs, scan_uri) + if not uris: + return + logger.notice(_("downloading input URIs", count=len(uris))) + + # download them on the thread pool (but possibly further limiting concurrency) + download_concurrency = cfg.get_int("scheduler", "download_concurrency") + if download_concurrency <= 0: + download_concurrency = 999999 + ops = {} + incomplete = len(uris) + outstanding = [] + downloaded_bytes = 0 + cached_hits = 0 + exn = None + + while incomplete and not exn: + assert len(outstanding) <= incomplete + + # top up thread pool's queue (up to download_concurrency) + while uris and len(outstanding) < download_concurrency: + (uri, directory) = uris.pop() logger.info( _(f"schedule input {'directory' if directory else 'file'} download", uri=uri) ) @@ -941,25 +965,16 @@ def schedule_download(v: Union[Value.File, Value.Directory]) -> str: logger_prefix=logger_prefix + [f"download{len(ops)}"], ) ops[future] = uri - uris.add(uri) - return uri - - Value.rewrite_env_paths(inputs, schedule_download) - if not ops: - return - logger.notice(_("downloading input URIs", count=len(ops))) # pyre-fixme + outstanding.append(future) + assert outstanding - # collect the results, with "clean" fail-fast - outstanding = ops.keys() - downloaded_bytes = 0 - cached_hits = 0 - exn = None - while outstanding: + # wait for one or more oustanding downloads to finish just_finished, still_outstanding = futures.wait( outstanding, return_when=futures.FIRST_EXCEPTION ) outstanding = still_outstanding for future in just_finished: + # check results try: future_exn = future.exception() except futures.CancelledError: @@ -979,9 +994,11 @@ def schedule_download(v: Union[Value.File, Value.Directory]) -> str: outsfut.cancel() os.kill(os.getpid(), signal.SIGUSR1) exn = future_exn + incomplete -= 1 + if exn: raise exn - logger.notice( # pyre-fixme + logger.notice( _( "processed input URIs", cached=cached_hits, From 64ca963e0a062a747869d0719e75bbccafdd26d3 Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Sat, 16 Oct 2021 11:47:00 -1000 Subject: [PATCH 2/2] polish --- WDL/runtime/config_templates/default.cfg | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/WDL/runtime/config_templates/default.cfg b/WDL/runtime/config_templates/default.cfg index e6e4e79c..e468eef7 100644 --- a/WDL/runtime/config_templates/default.cfg +++ b/WDL/runtime/config_templates/default.cfg @@ -15,9 +15,10 @@ # 0 = default to host `nproc`. # -@ call_concurrency = 0 -# It may be particularly counterproductive to run too many downloader tasks concurrently, since -# they're so I/O-intensive. Therefore download_concurrency can be set lower than call_concurrency -# to limit the number of concurrent download tasks specifically. +# call_concurrency applies to URI download tasks too, however, it may be desirable to limit their +# concurrency further still, since they're exceptionally I/O-intensive. In that case set +# download_concurrency to a nonzero value lower than the effective call_concurrency. +# (New in v1.3.1) download_concurrency = 0 # container backend; docker_swarm (default), singularity, or as added by plug-ins container_backend = docker_swarm