Skip to content

Commit

Permalink
Bring back threadpool-based parallel drop execution
Browse files Browse the repository at this point in the history
The current implementation of multi-process drop execution was flawed
because it created a process per drop instance, leaving the fate of the
underlying system to the mercy of the size of the graph being executed.
Because of this, RPC clients were also created for every remote drop
instead of for every single remote host, which again can exhaust
resources in the host fairly rapidly.

This commit removes all traces of multi-process drop execution, bringing
back the simpler-but-reliable thread-based drop execution. By default
drops create their own threads when wanting to async_execute, although
differently from before this now happens via a WorkerPool class that
takes care of doing this. The NodeManager overrides that behavior by
injecting its own WorkerPool that is *always* backed by a thread pool
(which wasn't the case before, we left the thread count go wild).

Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed May 23, 2023
1 parent 4312257 commit 2ac077c
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 89 deletions.
62 changes: 37 additions & 25 deletions daliuge-engine/dlg/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from dlg.utils import object_tracking
from dlg.exceptions import InvalidDropException, InvalidRelationshipException

from dlg.process import DlgProcess
from dlg.meta import (
dlg_int_param,
)
Expand All @@ -24,6 +23,38 @@

track_current_drop = object_tracking("drop")


class WorkerPool:
"""Schedules app drops for asynchronous execution, and executes them."""

def async_execute(self, app_drop):
"""Schedules the execution of that given app drop."""

def run_app_drop(self, app_drop):
"""Executes the app drop's run method"""


class SimpleWorkerPool(WorkerPool):
"""
A simple pool-like object that creates a new thread for each app invocation
and runs the drop's run method when requested to execute the drop.
"""

def async_execute(self, app_drop):
t = threading.Thread(target=app_drop._execute_and_log_exception)
t.daemon = True
t.start()
# TODO: warp in an mp.pool.AsyncResult or otherwise make the result type
# the same across both WorkerPools so we can use it on the caller site.
return t

def run_app_drop(self, app_drop):
return app_drop.run()


_SIMPLE_WORKER_POOL = SimpleWorkerPool()


# ===============================================================================
# AppDROP classes follow
# ===============================================================================
Expand Down Expand Up @@ -76,6 +107,9 @@ def initialize(self, **kwargs):
# execution status.
self._execStatus = AppDROPStates.NOT_RUN

# by default spawn threads to execution drops asynchronously
self._worker_pool: WorkerPool = _SIMPLE_WORKER_POOL

@track_current_drop
def addInput(self, inputDrop, back=True):
uid = inputDrop.uid
Expand Down Expand Up @@ -393,15 +427,7 @@ def dropCompleted(self, uid, drop_state):
self.async_execute()

def async_execute(self):
# Return immediately, but schedule the execution of this app
# If we have been given a thread pool use that
if hasattr(self, "_tp"):
self._tp.apply_async(self._execute_and_log_exception)
else:
t = threading.Thread(target=self._execute_and_log_exception)
t.daemon = 1
t.start()
return t
return self._worker_pool.async_execute(self)

def _execute_and_log_exception(self):
try:
Expand All @@ -411,8 +437,6 @@ def _execute_and_log_exception(self):
"Unexpected exception during drop (%r) execution", self
)

_dlg_proc_lock = threading.Lock()

@track_current_drop
def execute(self, _send_notifications=True):
"""
Expand All @@ -432,19 +456,7 @@ def execute(self, _send_notifications=True):
self.execStatus = AppDROPStates.RUNNING
while tries < self.n_tries:
try:
if hasattr(self, "_tp"):
proc = DlgProcess(target=self.run, daemon=True)
# see YAN-975 for why this is happening
lock = InputFiredAppDROP._dlg_proc_lock
with lock:
proc.start()
with lock:
proc.join()
proc.close()
if proc.exception:
raise proc.exception
else:
self.run()
self._worker_pool.run_app_drop(self)
if self.execStatus == AppDROPStates.CANCELLED:
return
self.execStatus = AppDROPStates.FINISHED
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def dlgNM(parser, args):
action="store",
type="int",
dest="max_threads",
help="Max thread pool size used for executing drops. -1 means use all CPUs. 0 (default) means no threads.",
help="Max thread pool size used for executing drops. <= 0 means use all (physical) CPUs. Default is 0.",
default=0,
)
(options, args) = parser.parse_args(args)
Expand Down
64 changes: 30 additions & 34 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,15 @@
import sys
import threading
import time
import typing

from . import constants
from .drop_manager import DROPManager
from .session import Session

if sys.version_info >= (3, 8):
from .shared_memory_manager import DlgSharedMemoryManager
from .. import rpc, utils
from ..ddap_protocol import DROPStates
from ..apps.app_base import AppDROP
from dlg.data.drops.memory import InMemoryDROP, SharedMemoryDROP
from ..apps.app_base import AppDROP, SimpleWorkerPool
from ..exceptions import (
NoSessionException,
SessionAlreadyExistsException,
Expand Down Expand Up @@ -112,6 +110,31 @@ def _load(obj, callable_attr):
return obj


class NodeManagerWorkerPool(SimpleWorkerPool):

_pool: typing.Optional[multiprocessing.Pool]

def __init__(self, max_workers):
self._pool = None
if max_workers <= 0:
max_workers = cpu_count(logical=False)
logger.info(
"Initializing thread pool with %d workers",
max_workers
)
pool_class = multiprocessing.pool.ThreadPool
self._pool = pool_class(processes=max_workers)

def async_execute(self, app_drop):
return self._pool.apply_async(app_drop._execute_and_log_exception)

def _run_app_drop(self, app_drop):
return app_drop.run()

def close(self):
self._pool.close()
self._pool.join()

class NodeManagerBase(DROPManager):
"""
Base class for a DROPManager that creates and holds references to DROPs.
Expand Down Expand Up @@ -172,21 +195,7 @@ def __init__(
_load(l, "handleEvent") for l in event_listeners
]

# Start thread pool
self._threadpool = None
if max_threads == -1: # default use all CPU cores
max_threads = cpu_count(logical=False)
else: # never more than 200
max_threads = max(min(max_threads, 200), 1)
if sys.version_info >= (3, 8):
self._memoryManager = DlgSharedMemoryManager()
if max_threads > 1:
logger.info(
"Initializing thread pool with %d threads", max_threads
)
self._threadpool = multiprocessing.pool.ThreadPool(
processes=max_threads
)
self._worker_pool = NodeManagerWorkerPool(max_threads)

# Event handler that only logs status changes
debugging = logger.isEnabledFor(logging.DEBUG)
Expand All @@ -198,9 +207,7 @@ def start(self):

def shutdown(self):
self._dlm.cleanup()
if self._threadpool:
self._threadpool.close()
self._threadpool.join()
self._worker_pool.close()
super().shutdown()

def deliver_event(self, evt):
Expand Down Expand Up @@ -265,18 +272,7 @@ def deploySession(self, sessionId, completedDrops=[]):

def foreach(drop):
drop.autofill_environment_variables()
if self._threadpool is not None:
drop._tp = self._threadpool
if isinstance(drop, InMemoryDROP):
drop._sessID = sessionId
self._memoryManager.register_drop(drop.uid, sessionId)
elif isinstance(drop, SharedMemoryDROP):
if sys.version_info < (3, 8):
raise NotImplementedError(
"Shared memory is not implemented when using Python < 3.8"
)
drop._sessID = sessionId
self._memoryManager.register_drop(drop.uid, sessionId)
drop._worker_pool = self._worker_pool
self._dlm.addDrop(drop)

# Remote event forwarding
Expand Down
22 changes: 1 addition & 21 deletions daliuge-engine/dlg/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,25 +310,9 @@ class DropProxy(object):
"""

def __init__(self, rpc_client, proxy_info: ProxyInfo):
# The current version of multiprocessing support creates an RPCClient
# per DropProxy, disregarding the rpc_client parameter given here.
# This uses too many resources though, but is only needed if the NM is
# instructed to use multiprocessing support. To avoid this resource
# over-usage we then detect if the given rpc_client (an instance of
# NodeManagerBase) has been started with multiprocessing support (which
# is confusingly bound to there being a *thread* pool too) and only if
# we detect the situation we create our own RPCClient; otherwise we use
# the given rpc_client as is.
if hasattr(rpc_client, "_threadpool") and rpc_client._threadpool:
self.rpc_client = ZeroRPCClient()
self._own_rpc_client = True
else:
self.rpc_client = rpc_client
self._own_rpc_client = False
self.rpc_client = rpc_client
self._proxy_info: ProxyInfo = proxy_info
logger.debug("Created %r", self)
if self._own_rpc_client:
self.rpc_client.start()

def handleEvent(self, evt):
pass
Expand All @@ -348,7 +332,3 @@ def __getattr__(self, name):

def __repr__(self):
return f"<DropProxy with {self._proxy_info}"

def __del__(self):
if self._own_rpc_client:
self.rpc_client.shutdown()
8 changes: 0 additions & 8 deletions daliuge-engine/test/manager/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,11 +692,3 @@ def test_run_invalid_shmem_graph(self):
quickDeploy(dm, sessionID, graph)
self.assertEqual(1, len(dm._sessions[sessionID].drops))
dm.destroySession(sessionID)


@unittest.skipUnless(
os.environ.get("DALIUGE_RUN_MP_TESTS", "0") == "1",
"Unstable multiprocessing tests not run by default",
)
class TestDMParallel(NodeManagerTestsBase, unittest.TestCase):
nm_threads = multiprocessing.cpu_count()

0 comments on commit 2ac077c

Please sign in to comment.