Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop MultiProcessing #239

Merged
merged 10 commits into from
Feb 28, 2024
98 changes: 72 additions & 26 deletions daliuge-engine/dlg/apps/app_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import ABC, abstractmethod
from collections import OrderedDict
from typing import List
from concurrent.futures import Future
from typing import List, Callable
import logging
import math
import threading
Expand All @@ -15,7 +17,6 @@
from dlg.utils import object_tracking
from dlg.exceptions import InvalidDropException, InvalidRelationshipException

from dlg.process import DlgProcess
from dlg.meta import (
dlg_int_param,
)
Expand All @@ -24,6 +25,56 @@

track_current_drop = object_tracking("drop")


class DropRunner(ABC):
"""An executor for `run()`-ing an AppDROP"""

@abstractmethod
def run_drop(self, app_drop: "AppDROP") -> Future:
"""Executes `app_drop.run()`, returning a future with the result."""
raise NotImplementedError


class SyncDropRunner(DropRunner):
"""
A simple runner that executes synchronously.
"""

def run_drop(self, app_drop: "AppDROP") -> Future:
"""Run drop synchronously."""
future = Future()

try:
res = app_drop.run()
future.set_result(res)
except BaseException as e:
future.set_exception(e)

return future


def run_on_daemon_thread(func: Callable, *args, **kwargs) -> Future:
"""Runs a callable on a daemon thread, meaning it will be
ungracefully terminated if the process ends."""
future = Future()

def thread_target():
try:
res = func(*args, **kwargs)
future.set_result(res)
except BaseException as e:
future.set_exception(e)

t = threading.Thread(target=thread_target)
t.daemon = True
t.start()

return future


_SYNC_DROP_RUNNER = SyncDropRunner()


# ===============================================================================
# AppDROP classes follow
# ===============================================================================
Expand Down Expand Up @@ -54,6 +105,15 @@ class AppDROP(ContainerDROP):
an streaming input); for these cases see the `BarrierAppDROP`.
"""

def __getstate__(self):
state = super().__getstate__()
del state["_drop_runner"]
return state

def __setstate__(self, state):
super().__setstate__(state)
self._drop_runner = _SYNC_DROP_RUNNER

def initialize(self, **kwargs):
super(AppDROP, self).initialize(**kwargs)

Expand All @@ -76,6 +136,9 @@ def initialize(self, **kwargs):
# execution status.
self._execStatus = AppDROPStates.NOT_RUN

# by default run drops synchronously
self._drop_runner: DropRunner = _SYNC_DROP_RUNNER

@track_current_drop
def addInput(self, inputDrop, back=True):
uid = inputDrop.uid
Expand Down Expand Up @@ -393,15 +456,10 @@ def dropCompleted(self, uid, drop_state):
self.async_execute()

def async_execute(self):
# Return immediately, but schedule the execution of this app
# If we have been given a thread pool use that
if hasattr(self, "_tp"):
self._tp.apply_async(self._execute_and_log_exception)
else:
t = threading.Thread(target=self._execute_and_log_exception)
t.daemon = 1
t.start()
return t
# TODO Do we need another thread pool for this?
rtobar marked this conversation as resolved.
Show resolved Hide resolved
# Careful, trying to run this on the same threadpool as the
# DropRunner can cause deadlocks
return run_on_daemon_thread(self._execute_and_log_exception)

def _execute_and_log_exception(self):
try:
Expand All @@ -411,8 +469,6 @@ def _execute_and_log_exception(self):
"Unexpected exception during drop (%r) execution", self
)

_dlg_proc_lock = threading.Lock()

@track_current_drop
def execute(self, _send_notifications=True):
"""
Expand All @@ -432,19 +488,9 @@ def execute(self, _send_notifications=True):
self.execStatus = AppDROPStates.RUNNING
while tries < self.n_tries:
try:
if hasattr(self, "_tp"):
proc = DlgProcess(target=self.run, daemon=True)
# see YAN-975 for why this is happening
lock = InputFiredAppDROP._dlg_proc_lock
with lock:
proc.start()
with lock:
proc.join()
proc.close()
if proc.exception:
raise proc.exception
else:
self.run()
fut = self._drop_runner.run_drop(self)
fut.result()

if self.execStatus == AppDROPStates.CANCELLED:
return
self.execStatus = AppDROPStates.FINISHED
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
# Pass down daliuge-specific information to the subprocesses as environment variables
env = os.environ.copy()
env.update({"DLG_UID": self._uid})
if self._dlg_session:
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})
if self._dlg_session_id:
env.update({"DLG_SESSION_ID": self._dlg_session_id})

env.update({"DLG_ROOT": utils.getDlgDir()})

Expand Down
8 changes: 3 additions & 5 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,7 @@ def initialize(self, **kwargs):
"WorkingDir", None
)
# self.workdir = None
self._sessionId = (
self._dlg_session.sessionId if self._dlg_session else ""
)
self._sessionId = self._dlg_session_id
if not self.workdir:
default_workingdir = os.path.join(
utils.getDlgWorkDir(), self._sessionId
Expand Down Expand Up @@ -492,8 +490,8 @@ def run(self):
# deal with environment variables
env = {}
env.update({"DLG_UID": self._uid})
if self._dlg_session:
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})
if self._dlg_session_id:
env.update({"DLG_SESSION_ID": self._dlg_session_id})
if self._user is not None:
env.update({"USER": self._user, "DLG_ROOT": utils.getDlgDir()})
if self._env is not None:
Expand Down
25 changes: 5 additions & 20 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import multiprocessing
import queue
import threading
import six

from .. import rpc, utils
from .. import rpc
from ..ddap_protocol import AppDROPStates
from ..apps.app_base import AppDROP, BarrierAppDROP
from ..exceptions import InvalidDropException
Expand Down Expand Up @@ -440,9 +439,7 @@ def advance_step(f, *args, **kwargs):
client.start()

def setup_drop_proxies(inputs, outputs):
to_drop_proxy = lambda x: rpc.DropProxy(
client, x[0], x[1], x[2], x[3]
)
to_drop_proxy = lambda proxy_info: rpc.DropProxy(client, proxy_info)
inputs = [to_drop_proxy(i) for i in inputs]
outputs = [to_drop_proxy(o) for o in outputs]
return inputs, outputs
Expand Down Expand Up @@ -498,15 +495,15 @@ def initialize(self, **kwargs):
self.proc = None

def run(self):
if not hasattr(self, "_rpc_server"):
if self._rpc_endpoint is None:
raise Exception("DynlibProcApp can only run within an RPC server")

# On the sub-process we create DropProxy objects, so we need to extract
# from our inputs/outputs their contact point (RPC-wise) information.
# If one of our inputs/outputs is a DropProxy we already have this
# information; otherwise we must figure it out.
inputs = [self._get_proxy_info(i) for i in self.inputs]
outputs = [self._get_proxy_info(o) for o in self.outputs]
inputs = [rpc.ProxyInfo.from_data_drop(i) for i in self.inputs]
outputs = [rpc.ProxyInfo.from_data_drop(o) for o in self.outputs]

logger.info("Starting new process to run the dynlib on")
queue = multiprocessing.Queue()
Expand Down Expand Up @@ -537,18 +534,6 @@ def run(self):
finally:
self.proc.join(self.timeout)

def _get_proxy_info(self, x):
if isinstance(x, rpc.DropProxy):
return x.hostname, x.port, x.session_id, x.uid

# TODO: we can't use the NodeManager's host directly here, as that
# indicates the address the different servers *bind* to
# (and, for example, can be 0.0.0.0)
rpc_server = x._rpc_server
host, port = rpc_server._rpc_host, rpc_server._rpc_port
host = utils.to_externally_contactable_host(host, prefer_local=True)
return (host, port, x._dlg_session.sessionId, x.uid)

def cancel(self):
BarrierAppDROP.cancel(self)
try:
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ def initialize(self, **kwargs):

env = os.environ.copy()
env.update({"DLG_UID": self._uid})
if self._dlg_session:
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})
if self._dlg_session_id:
env.update({"DLG_SESSION_ID": self._dlg_session_id})

self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})

Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/data/drops/data_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def get_dir(self, dirname):
return dirname

parts = []
if self._dlg_session:
if self._dlg_session_id:
parts.append(".")
else:
parts.append("/tmp/daliuge_tfiles")
Expand Down
41 changes: 32 additions & 9 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Module containing the core DROP classes.
"""
import ast
import collections
import inspect
import logging
import os
Expand All @@ -31,6 +32,7 @@
import re
import sys
from abc import ABCMeta
from typing import Optional, Tuple

from dlg.common.reproducibility.constants import (
ReproducibilityFlags,
Expand Down Expand Up @@ -164,6 +166,33 @@ class AbstractDROP(EventFirer, EventHandler):
# - Subclasses implement methods decorated with @abstractmethod
__metaclass__ = ABCMeta

# Matcher used to validate environment_variable_syntax
_env_var_matcher = re.compile(r"\$[A-z|\d]+\..+")
_dlg_var_matcher = re.compile(r"\$DLG_.+")

_known_locks = ('_finishedProducersLock', '_refLock')
_known_rlocks = ('_statusLock',)

_rpc_endpoint: Optional[Tuple[str, int]] = None

def __getstate__(self):
state = self.__dict__.copy()
rtobar marked this conversation as resolved.
Show resolved Hide resolved
for attr_name in AbstractDROP._known_locks + AbstractDROP._known_rlocks:
del state[attr_name]
del state["_listeners"]
return state

def __setstate__(self, state):
for attr_name in AbstractDROP._known_locks:
state[attr_name] = threading.Lock()
for attr_name in AbstractDROP._known_rlocks:
state[attr_name] = threading.RLock()

self.__dict__.update(state)

self._listeners = collections.defaultdict(list)


@track_current_drop
def __init__(self, oid, uid, **kwargs):
"""
Expand Down Expand Up @@ -193,11 +222,11 @@ def __init__(self, oid, uid, **kwargs):
# by the drop category when generating the drop spec
self._type = self._popArg(kwargs, "categoryType", None)

# The Session owning this drop, if any
# The ID of the session owning this drop, if any
# In most real-world situations this attribute will be set, but in
# general it cannot be assumed it will (e.g., unit tests create drops
# directly outside the context of a session).
self._dlg_session = self._popArg(kwargs, "dlg_session", None)
self._dlg_session_id = self._popArg(kwargs, "dlg_session_id", "")

# A simple name that the Drop might receive
# This is usually set in the Logical Graph Editor,
Expand All @@ -221,10 +250,6 @@ def __init__(self, oid, uid, **kwargs):
self._producers_uids = set()
self._producers = ListAsDict(self._producers_uids)

# Matcher used to validate environment_variable_syntax
self._env_var_matcher = re.compile(r"\$[A-z|\d]+\..+")
self._dlg_var_matcher = re.compile(r"\$DLG_.+")

# Set holding the state of the producers that have finished their
# execution. Once all producers have finished, this DROP moves
# itself to the COMPLETED state
Expand Down Expand Up @@ -775,9 +800,7 @@ def _fire(self, eventType: str, **kwargs):
"""
kwargs["oid"] = self.oid
kwargs["uid"] = self.uid
kwargs["session_id"] = (
self._dlg_session.sessionId if self._dlg_session else ""
)
kwargs["session_id"] = self._dlg_session_id
kwargs["name"] = self.name
kwargs["lg_key"] = self.lg_key
self._fireEvent(eventType, **kwargs)
Expand Down
Loading
Loading