Skip to content

Commit

Permalink
Merge 2889915 into fd8ab70
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin authored Apr 8, 2020
2 parents fd8ab70 + 2889915 commit 866a834
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 13 deletions.
6 changes: 6 additions & 0 deletions WDL/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def __getitem__(self, key: str) -> str:
def get_int(self, key: str) -> int:
return self._parent.get_int(self._section, key)

def get_float(self, key: str) -> float:
return self._parent.get_float(self._section, key)

def get_bool(self, key: str) -> bool:
return self._parent.get_bool(self._section, key)

Expand Down Expand Up @@ -202,6 +205,9 @@ def _parse(self, section: str, key: str, ty: str, parse: Callable[[str], _T]) ->
def get_int(self, section: str, key: str) -> int:
return self._parse(section, key, "int", int)

def get_float(self, section: str, key: str) -> float:
return self._parse(section, key, "float", float)

def get_bool(self, section: str, key: str) -> bool:
return self._parse(section, key, "bool", _parse_bool)

Expand Down
12 changes: 9 additions & 3 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ copy_input_files = false
# arising from files with multiple hardlinks! See also [task_runtime] delete_work, below.
output_hardlinks = false


[task_runtime]
# Effective maximum values of runtime.cpu and runtime.memory (bytes), which evaluated values are
# rounded down to. 0 = detect host resources, -1 = do not apply a limit.
# Effective maximum values of runtime.cpu and runtime.memory (bytes), which evaluated values may be
# rounded down to in order to "fit" the maximum available host resources. Warning: tasks may
# deadlock if these are set higher than actual achievable resources.
# 0 = detect host resources, -1 = do not apply a limit.
# --runtime-cpu-max, --runtime-memory-max
# Warning: tasks may deadlock if these are set higher than actual provision-able resources.
cpu_max = 0
memory_max = 0
# A task's runtime.memory is used as a "reservation" to guide container scheduling, but isn't an
# enforced limit unless memory_limit_multiplier is positive, which sets a hard limit of
# memory_limit_multiplier*runtime.memory. Recommendation: if activating this, disable host swap.
memory_limit_multiplier = 0.0
# Defaults which each task's runtime{} section will be merged into. --runtime-defaults
defaults = {
"docker": "ubuntu:18.04"
Expand Down
48 changes: 38 additions & 10 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,14 @@ def copy_input_files(self, logger: logging.Logger) -> None:
os.makedirs(os.path.dirname(host_copy_filename), exist_ok=True)
shutil.copy(host_filename, host_copy_filename)

def run(self, logger: logging.Logger, command: str, cpu: int, memory: int) -> None:
def run(
self,
logger: logging.Logger,
command: str,
cpu: int,
memory_reservation: int,
memory_limit: int,
) -> None:
"""
1. Container is instantiated with the configured mounts
2. The mounted directory and all subdirectories have u+rwx,g+rwx permission bits; all files
Expand All @@ -184,7 +191,9 @@ def run(self, logger: logging.Logger, command: str, cpu: int, memory: int) -> No
raise Terminated()
self._running = True
try:
exit_status = self._run(logger, terminating, command, cpu, memory)
exit_status = self._run(
logger, terminating, command, cpu, memory_reservation, memory_limit
)
finally:
self._running = False

Expand All @@ -200,7 +209,8 @@ def _run(
terminating: Callable[[], bool],
command: str,
cpu: int,
memory: int,
memory_reservation: int,
memory_limit: int,
) -> int:
# run command in container & return exit status
raise NotImplementedError()
Expand Down Expand Up @@ -403,7 +413,8 @@ def _run(
terminating: Callable[[], bool],
command: str,
cpu: int,
memory: int,
memory_reservation: int,
memory_limit: int,
) -> int:
self._observed_states = set()
with open(os.path.join(self.host_dir, "command"), "x") as outfile:
Expand All @@ -423,7 +434,9 @@ def _run(

# connect to dockerd
client = docker.from_env(timeout=900)
resources, user, groups = self.misc_config(logger, client, cpu, memory)
resources, user, groups = self.misc_config(
logger, client, cpu, memory_reservation, memory_limit
)
svc = None
exit_code = None
try:
Expand Down Expand Up @@ -532,15 +545,22 @@ def touch_mount_point(container_file: str) -> None:
return mounts

def misc_config(
self, logger: logging.Logger, client: docker.DockerClient, cpu: int, memory: int
self,
logger: logging.Logger,
client: docker.DockerClient,
cpu: int,
memory_reservation: int,
memory_limit: int,
) -> Tuple[Optional[Dict[str, str]], Optional[str], List[str]]:
resources = {}
if cpu > 0:
# the cpu unit expected by swarm is "NanoCPUs"
resources["cpu_limit"] = cpu * 1_000_000_000
resources["cpu_reservation"] = cpu * 1_000_000_000
if memory > 0:
resources["mem_reservation"] = memory
if memory_reservation > 0:
resources["mem_reservation"] = memory_reservation
if memory_limit > 0:
resources["mem_limit"] = memory_limit
if resources:
logger.debug(_("docker resources", **resources))
resources = docker.types.Resources(**resources)
Expand Down Expand Up @@ -993,7 +1013,11 @@ def _eval_task_runtime(
)
)
memory_bytes = memory_max
ans["memory"] = memory_bytes
ans["memory_reservation"] = memory_bytes

memory_limit_multiplier = cfg["task_runtime"].get_float("memory_limit_multiplier")
if memory_limit_multiplier > 0.0:
ans["memory_limit"] = int(memory_limit_multiplier * memory_bytes)

if "maxRetries" in runtime_values:
ans["maxRetries"] = max(0, runtime_values["maxRetries"].coerce(Type.Int()).value)
Expand Down Expand Up @@ -1028,7 +1052,11 @@ def _try_task(
try:
# start container & run command
return container.run(
logger, command, int(runtime.get("cpu", 0)), int(runtime.get("memory", 0))
logger,
command,
int(runtime.get("cpu", 0)),
int(runtime.get("memory_reservation", 0)),
int(runtime.get("memory_limit", 0)),
)
except Exception as exn:
if isinstance(exn, Terminated) or prevRetries >= maxRetries:
Expand Down
26 changes: 26 additions & 0 deletions tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,32 @@ def test_runtime_memory(self):
self._test_task(txt, {"memory": "1Gaga"}, expected_exception=WDL.Error.EvalError)
self._test_task(txt, {"memory": "bogus"}, expected_exception=WDL.Error.EvalError)

def test_runtime_memory_limit(self):
txt = R"""
version 1.0
task limit {
input {
String memory
}
command <<<
cat /sys/fs/cgroup/memory/memory.limit_in_bytes
>>>
output {
Int memory_limit_in_bytes = read_int(stdout())
}
runtime {
cpu: 1
memory: "~{memory}"
}
}
"""
cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), [])
outputs = self._test_task(txt, {"memory": "256MB"}, cfg=cfg)
self.assertGreater(outputs["memory_limit_in_bytes"], 300*1024*1024)
cfg.override({"task_runtime": {"memory_limit_multiplier": 0.9}})
outputs = self._test_task(txt, {"memory": "256MB"}, cfg=cfg)
self.assertLess(outputs["memory_limit_in_bytes"], 300*1024*1024)

def test_input_files_rw(self):
txt = R"""
version 1.0
Expand Down

0 comments on commit 866a834

Please sign in to comment.