Skip to content

Commit

Permalink
Merge 85d934d into 79bfcb6
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Nov 21, 2019
2 parents 79bfcb6 + 85d934d commit 20adb17
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 30 deletions.
32 changes: 22 additions & 10 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ def fill_run_subparser(subparsers):
"-d",
"--dir",
metavar="DIR",
dest="rundir",
dest="run_dir",
help="directory under which to create a timestamp-named subdirectory for this run (defaults to current working directory); supply '.' or 'some/dir/.' to instead run in this directory exactly",
)
group = run_parser.add_argument_group("provisioning")
group = run_parser.add_argument_group("task runtime")
group.add_argument(
"-@",
metavar="N",
Expand All @@ -332,19 +332,26 @@ def fill_run_subparser(subparsers):
help="maximum concurrent tasks (default: # host processors, effectively lower when tasks require multiple processors)",
)
group.add_argument(
"--max-runtime-cpu",
"--runtime-cpu-max",
metavar="N",
type=int,
default=None,
help="maximum effective runtime.cpu for any task (default: # host processors)",
)
group.add_argument(
"--max-runtime-memory",
"--runtime-memory-max",
metavar="N",
type=str,
default=None,
help="maximum effective runtime.memory for any task (default: total host memory)",
)
group.add_argument(
"--runtime-defaults",
metavar="JSON",
type=str,
default=None,
help="""default runtime settings for all tasks (JSON filename or literal object e.g. '{"maxRetries":2}')""",
)
group.add_argument(
"--copy-input-files",
action="store_true",
Expand Down Expand Up @@ -378,6 +385,8 @@ def runner(
input_file=None,
empty=[],
json_only=False,
runtime_defaults=None,
runtime_memory_max=None,
path=None,
check_quant=True,
**kwargs,
Expand Down Expand Up @@ -416,13 +425,16 @@ def runner(

# configuration
run_kwargs = dict(
(k, kwargs[k])
for k in ["copy_input_files", "max_runtime_cpu", "max_runtime_memory", "as_me"]
(k, kwargs[k]) for k in ["copy_input_files", "run_dir", "runtime_cpu_max", "as_me"]
)
if run_kwargs["max_runtime_memory"]:
run_kwargs["max_runtime_memory"] = parse_byte_size(run_kwargs["max_runtime_memory"])
if "rundir" in kwargs:
run_kwargs["run_dir"] = kwargs["rundir"]
if runtime_memory_max:
run_kwargs["runtime_memory_max"] = parse_byte_size(runtime)
if runtime_defaults:
if runtime_defaults.lstrip()[0] == "{":
run_kwargs["runtime_defaults"] = json.loads(runtime_defaults)
else:
with open(runtime_defaults, "r") as infile:
run_kwargs["runtime_defaults"] = json.load(infile)

ensure_swarm(logger)

Expand Down
43 changes: 28 additions & 15 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,9 @@ def run_local_task(
run_id: Optional[str] = None,
run_dir: Optional[str] = None,
copy_input_files: bool = False,
max_runtime_cpu: Optional[int] = None,
max_runtime_memory: Optional[int] = None,
runtime_defaults: Optional[Dict[str, Union[str, int]]] = None,
runtime_cpu_max: Optional[int] = None,
runtime_memory_max: Optional[int] = None,
logger_prefix: Optional[List[str]] = None,
as_me: bool = False,
) -> Tuple[str, Env.Bindings[Value.Base]]:
Expand All @@ -511,8 +512,9 @@ def run_local_task(
(defaults to current working directory).
If the final path component is ".", then operate in run_dir directly.
:param copy_input_files: copy input files and mount them read/write instead of read-only
:param max_runtime_cpu: maximum effective runtime.cpu value (default: # host CPUs)
:param max_runtime_memory: maximum effective runtime.memory value in bytes (default: total host
:param runtime_defaults: default values for runtime settings
:param runtime_cpu_max: maximum effective runtime.cpu value (default: # host CPUs)
:param runtime_memory_max: maximum effective runtime.memory value in bytes (default: total host
memory)
:param as_me: run container command using the current user uid:gid (may break commands that
assume root access, e.g. apt-get)
Expand Down Expand Up @@ -553,7 +555,7 @@ def run_local_task(

# evaluate runtime fields
runtime = _eval_task_runtime(
logger, task, container_env, max_runtime_cpu, max_runtime_memory
logger, task, container_env, runtime_defaults, runtime_cpu_max, runtime_memory_max
)
container.image_tag = str(runtime.get("docker", container.image_tag))
container.as_me = as_me
Expand Down Expand Up @@ -724,12 +726,23 @@ def _eval_task_runtime(
logger: logging.Logger,
task: Tree.Task,
env: Env.Bindings[Value.Base],
max_runtime_cpu: Optional[int],
max_runtime_memory: Optional[int],
runtime_defaults: Optional[Dict[str, Union[str, int]]],
runtime_cpu_max: Optional[int],
runtime_memory_max: Optional[int],
) -> Dict[str, Union[int, str]]:
global _host_memory

runtime_values = dict((key, expr.eval(env)) for key, expr in task.runtime.items())
runtime_values = {}
if runtime_defaults:
for key, v in runtime_defaults.items():
if isinstance(v, str):
runtime_values[key] = Value.String(v)
elif isinstance(v, int):
runtime_values[key] = Value.Int(v)
else:
raise Error.InputError(f"invalid default runtime setting {key} = {v}")
for key, expr in task.runtime.items():
runtime_values[key] = expr.eval(env)
logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
ans = {}

Expand All @@ -739,7 +752,7 @@ def _eval_task_runtime(
if "cpu" in runtime_values:
cpu_value = runtime_values["cpu"].coerce(Type.Int()).value
assert isinstance(cpu_value, int)
cpu = max(1, min(max_runtime_cpu or multiprocessing.cpu_count(), cpu_value))
cpu = max(1, min(runtime_cpu_max or multiprocessing.cpu_count(), cpu_value))
if cpu != cpu_value:
logger.warning(
_("runtime.cpu adjusted to local limit", original=cpu_value, adjusted=cpu)
Expand All @@ -756,19 +769,19 @@ def _eval_task_runtime(
task.runtime["memory"], "invalid setting of runtime.memory, " + memory_str
)

if not max_runtime_memory:
if not runtime_memory_max:
_host_memory = _host_memory or psutil.virtual_memory().total
max_runtime_memory = _host_memory
assert isinstance(max_runtime_memory, int)
if memory_bytes > max_runtime_memory:
runtime_memory_max = _host_memory
assert isinstance(runtime_memory_max, int)
if memory_bytes > runtime_memory_max:
logger.warning(
_(
"runtime.memory adjusted to local limit",
original=memory_bytes,
adjusted=max_runtime_memory,
adjusted=runtime_memory_max,
)
)
memory_bytes = max_runtime_memory
memory_bytes = runtime_memory_max
ans["memory"] = memory_bytes

if "maxRetries" in runtime_values:
Expand Down
2 changes: 1 addition & 1 deletion WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def _log_status(self) -> None:
_(
"workflow steps",
waiting=len(self.waiting),
running=len(self.running),
outstanding=len(self.running),
finished=len(self.finished),
)
)
Expand Down
8 changes: 4 additions & 4 deletions tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,8 @@ def test_cpu_limit(self):
# check task with overkill number of CPUs gets scheduled
outputs = self._test_task(txt, {"n": 8, "cpu": 9999})
self.assertLess(outputs["wall_seconds"], 8)
# check max_runtime_cpu set to 1 causes serialization
outputs = self._test_task(txt, {"n": 8, "cpu": 9999}, max_runtime_cpu=1)
# check runtime_cpu_max set to 1 causes serialization
outputs = self._test_task(txt, {"n": 8, "cpu": 9999}, runtime_cpu_max=1)
self.assertGreaterEqual(outputs["wall_seconds"], 8)

def test_runtime_memory(self):
Expand All @@ -753,8 +753,8 @@ def test_runtime_memory(self):
"""
self._test_task(txt, {"memory": "100000000"})
self._test_task(txt, {"memory": "1G"})
self._test_task(txt, {"memory": "99T"})
self._test_task(txt, {"memory": "99T"}, max_runtime_memory=WDL._util.parse_byte_size(" 123.45 MiB "))
self._test_task(txt, {"memory": "99T"}, runtime_defaults={"docker":"ubuntu:18.10","cpu":1})
self._test_task(txt, {"memory": "99T"}, runtime_memory_max=WDL._util.parse_byte_size(" 123.45 MiB "))
self._test_task(txt, {"memory": "-1"}, expected_exception=WDL.Error.EvalError)
self._test_task(txt, {"memory": "1Gaga"}, expected_exception=WDL.Error.EvalError)
self._test_task(txt, {"memory": "bogus"}, expected_exception=WDL.Error.EvalError)
Expand Down

0 comments on commit 20adb17

Please sign in to comment.