Skip to content

Commit

Permalink
polish TaskContainer interface
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Jun 25, 2022
1 parent 0cba0a1 commit a026d3c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 42 deletions.
17 changes: 3 additions & 14 deletions WDL/runtime/backend/cli_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from contextlib import ExitStack
from ..._util import PygtailLogger
from ..._util import StructuredLogMessage as _
from .. import config, _statusbar
from .. import config
from ..error import Terminated, DownloadFailed
from ..task_container import TaskContainer

Expand Down Expand Up @@ -71,6 +71,7 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
cli_log_filename = os.path.join(self.host_dir, f"{self.cli_name}.log.txt")
cli_log = cleanup.enter_context(open(cli_log_filename, "wb"))
cli_logger = logger.getChild(self.cli_name)
poll_stderr = cleanup.enter_context(self.poll_stderr_context(logger))
poll_cli_log = cleanup.enter_context(
PygtailLogger(
logger,
Expand All @@ -79,13 +80,6 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
level=logging.INFO,
)
)
poll_stderr = cleanup.enter_context(
PygtailLogger(
logger,
self.host_stderr_txt(),
callback=self.stderr_callback,
)
)

# prepare command
with open(os.path.join(self.host_dir, "command"), "w") as outfile:
Expand All @@ -103,12 +97,7 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
logger.notice( # pyre-ignore
_(f"{self.cli_name} run", pid=proc.pid, log=cli_log_filename)
)
cleanup.enter_context(
_statusbar.task_running(
self.runtime_values.get("cpu", 0),
self.runtime_values.get("memory_reservation", 0),
)
)
cleanup.enter_context(self.task_running_context())

# long-poll for completion
exit_code = None
Expand Down
22 changes: 4 additions & 18 deletions WDL/runtime/backend/docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
from typing import List, Dict, Set, Optional, Any, Callable, Tuple, Iterable
import docker
from ... import Error
from ..._util import (
chmod_R_plus,
PygtailLogger,
)
from ..._util import chmod_R_plus
from ..._util import StructuredLogMessage as _
from .. import config, _statusbar
from .. import config
from ..error import Interrupted, Terminated
from ..task_container import TaskContainer

Expand Down Expand Up @@ -232,13 +229,7 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:

# stream stderr into log
with contextlib.ExitStack() as cleanup:
poll_stderr = cleanup.enter_context(
PygtailLogger(
logger,
self.host_stderr_txt(),
callback=self.stderr_callback,
)
)
poll_stderr = cleanup.enter_context(self.poll_stderr_context(logger))

# poll for container exit
running_states = {"preparing", "running"}
Expand Down Expand Up @@ -277,12 +268,7 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
# indicate actual container start in status bar
# 'preparing' is when docker is pulling and extracting the image, which can
# be a lengthy and somewhat intensive operation, so we count it as running.
cleanup.enter_context(
_statusbar.task_running(
self.runtime_values.get("cpu", 0),
self.runtime_values.get("memory_reservation", 0),
)
)
cleanup.enter_context(self.task_running_context())
was_running = True
if "running" in self._observed_states:
poll_stderr()
Expand Down
50 changes: 40 additions & 10 deletions WDL/runtime/task_container.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
"""
Abstract interface for task container runtime + default Docker Swarm backend
Abstract interface for task container runtime
"""
import os
import logging
import shutil
import threading
from typing import Callable, Iterable, List, Set, Tuple, Type, Any, Dict, Optional
from typing import Callable, Iterable, List, Set, Type, Any, Dict, Optional, ContextManager
from abc import ABC, abstractmethod
from contextlib import suppress
from .. import Error
from .._util import (
TerminationSignalFlag,
path_really_within,
rmtree_atomic,
PygtailLogger,
)
from .._util import StructuredLogMessage as _
from . import config
from . import config, _statusbar
from .error import OutputError, Terminated, CommandFailed


Expand All @@ -29,16 +30,16 @@ class TaskContainer(ABC):
@classmethod
def global_init(cls, cfg: config.Loader, logger: logging.Logger) -> None:
"""
Perform any necessary one-time initialization of the underlying container backend. Must be
Perform any necessary one-time initialization of the underlying container backend. To be
invoked once per process prior to any instantiation of the class.
"""
raise NotImplementedError()

@classmethod
def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> Dict[str, int]:
"""
Detect the maximum resources (cpu and mem_bytes) that the underlying container backend
would be able to provision.
Detect the maximum resources ("cpu" and "mem_bytes") that the underlying container backend
will be able to provision for any one task.
If determining this is at all costly, then backend should memoize (thread-safely and
perhaps front-loaded in global_init).
Expand Down Expand Up @@ -74,21 +75,25 @@ def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> D
"""

input_path_map_rev: Dict[str, str]
"""
Inverse of ``input_path_map`` (also maintained by ``add_paths``)
"""

try_counter: int
"""
:type: int
Counter for number of retries; starts at 1 on the first attempt. On subsequent attempts, the
names (on the host) of the working directory, stdout.txt, and stderr.txt will incorporate the
names (on the host) of the working directory, stdout.txt, and stderr.txt may incorporate the
count, to ensure their uniqueness.
"""

runtime_values: Dict[str, Any]
"""
Evaluted task runtime{} section. Typically the TaskContainer backend needs to honor cpu,
memory_limit, memory_reservation, docker. Resources must have already been fit to
get_resource_limits(). Retry logic (maxRetries, preemptible) is handled externally.
Evaluted task runtime{} section, to be populated externall before invoking run(). Typically the
TaskContainer backend needs to honor cpu, memory_limit, memory_reservation, docker. Resources
must have already been fit to get_resource_limits(). Retry logic (maxRetries, preemptible) is
handled externally.
"""

stderr_callback: Optional[Callable[[str], None]]
Expand Down Expand Up @@ -359,6 +364,31 @@ def host_stderr_txt(self):
self.host_dir, f"stderr{self.try_counter if self.try_counter > 1 else ''}.txt"
)

def poll_stderr_context(self, logger: logging.Logger) -> ContextManager[Callable[[], None]]:
"""
Implementation helper: open a context yielding a function to poll stderr.txt and log each
each line (to either logger or self.stderr_callback if set). _run() implementation should
call the function periodically while container is running, and close the context once
done/failed.
"""
return PygtailLogger(
logger,
self.host_stderr_txt(),
callback=self.stderr_callback,
)

def task_running_context(self) -> ContextManager[None]:
"""
Implementation helper: open a context which counts the task, and its CPU and memory
reservations, in the CLI status bar's "running" ticker. _run() implementation should open
this context once the container is truly running (not while e.g. still queued), and close
it once done/failed.
"""
return _statusbar.task_running(
self.runtime_values.get("cpu", 0),
self.runtime_values.get("memory_reservation", 0),
)


_backends: Dict[str, Type[TaskContainer]] = dict()
_backends_lock: threading.Lock = threading.Lock()
Expand Down

0 comments on commit a026d3c

Please sign in to comment.