Skip to content

Commit

Permalink
parallel scheduling for singularity containers (#529)
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Nov 24, 2021
1 parent 8adb212 commit 59c7848
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 33 deletions.
95 changes: 86 additions & 9 deletions WDL/runtime/backend/cli_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import os
import time
import psutil
import logging
import threading
import contextlib
import subprocess
from typing import Callable, List, Tuple
import multiprocessing
from typing import Callable, List, Tuple, Dict, Optional
from abc import abstractmethod, abstractproperty
from ..._util import PygtailLogger
from ..._util import StructuredLogMessage as _
from .. import _statusbar
from .. import config, _statusbar
from ..error import Terminated
from ..task_container import TaskContainer

Expand All @@ -19,14 +22,42 @@ class SubprocessBase(TaskContainer):
exact command line arguments for the respective implementation.
"""

_resource_limits: Optional[Dict[str, int]] = None
_bind_input_files: bool = True
_lock = threading.Lock()
_lock: threading.Lock = threading.Lock()

@classmethod
def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> Dict[str, int]:
if not cls._resource_limits:
cls._resource_limits = {
"cpu": multiprocessing.cpu_count(),
"mem_bytes": psutil.virtual_memory().total,
}
logger.info(
_(
"detected host resources",
cpu=cls._resource_limits["cpu"],
mem_bytes=cls._resource_limits["mem_bytes"],
)
)
_SubprocessScheduler.global_init(cls._resource_limits)
return cls._resource_limits

def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: str) -> int:
with contextlib.ExitStack() as cleanup:
# global lock to run one container at a time
# (to be replaced by resource scheduling logic)
cleanup.enter_context(self._lock)
# await cpu & memory availability
cpu_reservation = self.runtime_values.get("cpu", 0)
memory_reservation = self.runtime_values.get("memory_reservation", 0)
scheduler = _SubprocessScheduler(cpu_reservation, memory_reservation)
cleanup.enter_context(scheduler)
logger.info(
_(
"provisioned",
seconds_waited=scheduler.delay,
cpu=cpu_reservation,
mem_bytes=memory_reservation,
)
)

# prepare loggers
cli_log_filename = os.path.join(self.host_dir, f"{self.cli_name}.log.txt")
Expand Down Expand Up @@ -58,7 +89,9 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
"-c",
"bash ../command >> ../stdout.txt 2>> ../stderr.txt",
]
proc = subprocess.Popen(invocation, stdout=cli_log, stderr=subprocess.STDOUT)
proc = subprocess.Popen(
invocation, stdout=cli_log, stderr=subprocess.STDOUT, cwd=self.host_dir
)
logger.notice( # pyre-ignore
_(f"{self.cli_name} run", pid=proc.pid, log=cli_log_filename)
)
Expand Down Expand Up @@ -145,5 +178,49 @@ def touch_mount_point(host_path: str) -> None:
mounts.append((container_path.rstrip("/"), host_path.rstrip("/"), False))
return mounts

# TODO: common resource scheduling logic (accounting for multiple concurrent miniwdl processes?)
# use old container-based way of detecting houst resources

class _SubprocessScheduler(contextlib.AbstractContextManager):
"""
Logic for scheduling parallel containers to fit host cpu & memory resources
"""

_lock: threading.Lock = threading.Lock()
_cv: threading.Condition
_state: Dict[str, int] = {}
delay: int = 0

@classmethod
def global_init(cls, resource_limits: Dict[str, int]):
with cls._lock:
cls._cv = threading.Condition(cls._lock)
cls._state["host_cpu"] = resource_limits["cpu"]
cls._state["host_memory"] = resource_limits["mem_bytes"]
cls._state["used_cpu"] = 0
cls._state["used_memory"] = 0

def __init__(self, cpu_reservation: int, memory_reservation: int):
assert self._cv
assert 0 <= cpu_reservation <= self._state["host_cpu"]
assert 0 <= memory_reservation <= self._state["host_memory"]
self.cpu_reservation = cpu_reservation
self.memory_reservation = memory_reservation

def __enter__(self):
t0 = time.time()
with self._cv:
while (
self._state["used_cpu"] + self.cpu_reservation > self._state["host_cpu"]
or self._state["used_memory"] + self.memory_reservation > self._state["host_memory"]
):
self._cv.wait()
self._state["used_cpu"] = self._state["used_cpu"] + self.cpu_reservation
self._state["used_memory"] = self._state["used_memory"] + self.memory_reservation
self.delay = int(time.time() - t0)

def __exit__(self, *exc):
with self._cv:
self._state["used_cpu"] = self._state["used_cpu"] - self.cpu_reservation
assert 0 <= self._state["used_cpu"] <= self._state["host_cpu"]
self._state["used_memory"] = self._state["used_memory"] - self.memory_reservation
assert 0 <= self._state["used_memory"] <= self._state["host_memory"]
self._cv.notify()
88 changes: 64 additions & 24 deletions WDL/runtime/backend/singularity.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import os
import time
import shlex
import psutil
import shutil
import logging
import tempfile
import threading
import subprocess
import multiprocessing
from typing import List, Dict, Callable, Optional
from .. import config
from typing import List, Callable, Optional
from ...Error import InputError
from ..._util import StructuredLogMessage as _
from ..._util import rmtree_atomic
from .. import config
from ..error import DownloadFailed
from .cli_subprocess import SubprocessBase


Expand All @@ -19,8 +19,9 @@ class SingularityContainer(SubprocessBase):
Singularity task runtime based on cli_subprocess.SubprocessBase
"""

_resource_limits: Dict[str, int]
_tempdir: Optional[str] = None
_pull_lock: threading.Lock = threading.Lock()
_pulled_images = set()

@classmethod
def global_init(cls, cfg: config.Loader, logger: logging.Logger) -> None:
Expand All @@ -40,31 +41,24 @@ def global_init(cls, cfg: config.Loader, logger: logging.Logger) -> None:
version=singularity_version.stdout.strip(),
)
)
cls._resource_limits = {
"cpu": multiprocessing.cpu_count(),
"mem_bytes": psutil.virtual_memory().total,
}
logger.info(
_(
"detected host resources",
cpu=cls._resource_limits["cpu"],
mem_bytes=cls._resource_limits["mem_bytes"],
)
)
pass

@classmethod
def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> Dict[str, int]:
return cls._resource_limits

@property
def cli_name(self) -> str:
return "singularity"

@property
def docker_uri(self) -> str:
return "docker://" + self.runtime_values.get(
"docker", self.cfg.get_dict("task_runtime", "defaults")["docker"]
)

def _cli_invocation(self, logger: logging.Logger) -> List[str]:
"""
Formulate `singularity run` command-line invocation
"""
self._singularity_pull(logger)

ans = ["singularity"]
if logger.isEnabledFor(logging.DEBUG):
ans.append("--verbose")
Expand All @@ -74,7 +68,6 @@ def _cli_invocation(self, logger: logging.Logger) -> List[str]:
os.path.join(self.container_dir, "work"),
]
ans += self.cfg.get_list("singularity", "cli_options")
docker_uri = "docker://" + self.runtime_values.get("docker", "ubuntu:20.04")

mounts = self.prepare_mounts()
# Also create a scratch directory and mount to /tmp and /var/tmp
Expand All @@ -89,7 +82,7 @@ def _cli_invocation(self, logger: logging.Logger) -> List[str]:
logger.info(
_(
"singularity invocation",
args=" ".join(shlex.quote(s) for s in (ans + [docker_uri])),
args=" ".join(shlex.quote(s) for s in (ans + [self.docker_uri])),
binds=len(mounts),
tmpdir=self._tempdir,
)
Expand All @@ -102,7 +95,7 @@ def _cli_invocation(self, logger: logging.Logger) -> List[str]:
if not writable:
bind_arg += ":ro"
ans.append(bind_arg)
ans.append(docker_uri)
ans.append(self.docker_uri)
return ans

def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: str) -> int:
Expand All @@ -115,3 +108,50 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
if self._tempdir:
logger.info(_("delete container temporary directory", tmpdir=self._tempdir))
rmtree_atomic(self._tempdir)

def _singularity_pull(self, logger: logging.Logger):
"""
Ensure the needed docker image is cached by singularity. Use a global lock so we'll only
download it once, even if used by many parallel tasks all starting at the same time.
"""
t0 = time.time()
with self._pull_lock:
t1 = time.time()
docker_uri = self.docker_uri

if docker_uri in self._pulled_images:
logger.info(_("singularity image already pulled", uri=docker_uri))
return

with tempfile.TemporaryDirectory(prefix="miniwdl_sif_") as pulldir:
logger.info(_("begin singularity pull", uri=docker_uri, tempdir=pulldir))
puller = subprocess.run(
["singularity", "pull", docker_uri],
cwd=pulldir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
if puller.returncode != 0:
logger.error(
_(
"singularity pull failed",
stderr=puller.stderr.split("\n"),
stdout=puller.stdout.split("\n"),
)
)
raise DownloadFailed(docker_uri)
# The docker image layers are cached in SINGULARITY_CACHEDIR, so we don't need to
# keep {pulldir}/*.sif

self._pulled_images.add(docker_uri)

# TODO: log image sha256sum?
logger.notice(
_(
"singularity pull",
uri=docker_uri,
seconds_waited=int(t1 - t0),
seconds_pulling=int(time.time() - t1),
)
)
35 changes: 35 additions & 0 deletions tests/singularity.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash
# bash-tap tests for miniwdl's Singularity task runtime. `singularity` must be available.
set -o pipefail

cd "$(dirname $0)/.."
SOURCE_DIR="$(pwd)"

BASH_TAP_ROOT="tests/bash-tap"
source tests/bash-tap/bash-tap-bootstrap

export PYTHONPATH="$SOURCE_DIR:$PYTHONPATH"
miniwdl="python3 -m WDL"

if [[ -z $TMPDIR ]]; then
TMPDIR=/tmp
fi
DN=$(mktemp -d "${TMPDIR}/miniwdl_runner_tests_XXXXXX")
DN=$(realpath "$DN")
cd $DN
echo "$DN"

plan tests 2

export MINIWDL__SCHEDULER__CONTAINER_BACKEND=singularity

$miniwdl run_self_test --dir "$DN"
is "$?" "0" "run_self_test"

git clone --depth=1 https://github.com/broadinstitute/viral-pipelines.git
cd viral-pipelines

$miniwdl run pipes/WDL/workflows/assemble_denovo.wdl \
--path pipes/WDL/tasks --dir "$DN" --verbose \
-i test/input/WDL/test_inputs-assemble_denovo-local.json
is "$?" "0" "assemble_denovo success"

0 comments on commit 59c7848

Please sign in to comment.