Skip to content

Commit

Permalink
run: provide command-line override of concurrency limit (max_workers)
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Oct 2, 2019
1 parent 9f56926 commit dda64bf
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
29 changes: 21 additions & 8 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import docker
from shlex import quote as shellquote
from datetime import datetime
from argparse import ArgumentParser, Action
from argparse import ArgumentParser, Action, SUPPRESS
import pkg_resources
from . import *
from ._util import (
Expand Down Expand Up @@ -299,12 +299,18 @@ def fill_run_subparser(subparsers):
dest="input_file",
help="file with Cromwell-style input JSON; command-line inputs will be merged in",
)
run_parser.add_argument(
"-t",
"--task",
metavar="TASK_NAME",
help="name of task to run (for WDL documents with multiple tasks & no workflow)",
)
run_parser.add_argument(
"-j",
"--json",
dest="json_only",
action="store_true",
help="just print Cromwell-style input JSON to standard output, then exit",
help=SUPPRESS, # "just print Cromwell-style input JSON to standard output, then exit",
)
run_parser.add_argument(
"-d",
Expand All @@ -316,13 +322,15 @@ def fill_run_subparser(subparsers):
run_parser.add_argument(
"--copy-input-files",
action="store_true",
help="copy input files for each task (for compatibility with commands assuming write access to them)",
help="copy input files for each task (for compatibility with mv/rm/write commands)",
)
run_parser.add_argument(
"-t",
"--task",
metavar="TASK_NAME",
help="name of task to run (for WDL documents with multiple tasks & no workflow)",
"-@",
metavar="N",
dest="max_workers",
type=int,
default=None,
help="maximum concurrent tasks; defaults to # of processors (limit effectively lower when tasks require multiple processors)",
)
run_parser.add_argument(
"-v",
Expand All @@ -346,6 +354,7 @@ def runner(
rundir=None,
path=None,
copy_input_files=False,
max_workers=None,
**kwargs,
):
# load WDL document
Expand Down Expand Up @@ -385,7 +394,11 @@ def runner(
runtime.run_local_task if isinstance(target, Task) else runtime.run_local_workflow
)
rundir, output_env = entrypoint(
target, input_env, run_dir=rundir, copy_input_files=copy_input_files
target,
input_env,
run_dir=rundir,
copy_input_files=copy_input_files,
max_workers=max_workers,
)
except Exception as exn:
if isinstance(exn, runtime.task.TaskFailure):
Expand Down
1 change: 1 addition & 0 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def run_local_task(
run_dir: Optional[str] = None,
copy_input_files: bool = False,
logger_prefix: str = "wdl:",
max_workers: Optional[int] = None, # unused
) -> Tuple[str, Env.Bindings[Value.Base]]:
"""
Run a task locally.
Expand Down
5 changes: 4 additions & 1 deletion WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def run_local_workflow(
run_dir: Optional[str] = None,
copy_input_files: bool = False,
logger_prefix: str = "wdl:",
max_workers: Optional[int] = None,
_test_pickle: bool = False,
) -> Tuple[str, Env.Bindings[Value.Base]]:
"""
Expand All @@ -584,7 +585,8 @@ def run_local_workflow(
exist; if it does, a timestamp-based subdirectory is created and used (defaults
to current working directory)
"""
max_workers = multiprocessing.cpu_count()
if max_workers is None:
max_workers = multiprocessing.cpu_count()
thread_pool = futures.ThreadPoolExecutor(max_workers=max_workers)
future_task_map = {}

Expand Down Expand Up @@ -630,6 +632,7 @@ def run_local_workflow(
run_id=next_call.id,
run_dir=os.path.join(run_dir, next_call.id),
copy_input_files=copy_input_files,
max_workers=max_workers,
logger_prefix=(logger_id + ":"),
)
future_task_map[future] = next_call.id
Expand Down

0 comments on commit dda64bf

Please sign in to comment.