Skip to content

Commit

Permalink
Merge branch 'yan-999'
Browse files Browse the repository at this point in the history
Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Jun 22, 2022
2 parents a75a982 + 677f62f commit 1b8c85f
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 100 deletions.
12 changes: 8 additions & 4 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@
import time
import re
import sys
import inspect
import binascii
from typing import List, Union

import numpy as np
import pyarrow.plasma as plasma
import six
from dlg.common.reproducibility.constants import (
ReproducibilityFlags,
REPRO_DEFAULT,
Expand All @@ -58,7 +55,6 @@
)
from dlg.common.reproducibility.reproducibility import common_hash
from merklelib import MerkleTree
from six import BytesIO

from .ddap_protocol import (
ExecutionMode,
Expand Down Expand Up @@ -1829,6 +1825,14 @@ class InMemoryDROP(DataDROP):
A DROP that points data stored in memory.
"""

# Allow in-memory drops to be automatically removed by default
def __init__(self, *args, **kwargs):
if 'precious' not in kwargs:
kwargs['precious'] = False
if 'expireAfterUse' not in kwargs:
kwargs['expireAfterUse'] = True
super().__init__(*args, **kwargs)

def initialize(self, **kwargs):
args = []
if "pydata" in kwargs:
Expand Down
72 changes: 42 additions & 30 deletions daliuge-engine/dlg/lifecycle/dlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ class DataLifecycleManagerBackgroundTask(threading.Thread):
signaled to stop
"""

def __init__(self, dlm, period, finishedEvent):
def __init__(self, name, dlm, period):
threading.Thread.__init__(self, name="DLMBackgroundTask")
self._dlm = dlm
self._period = period
self._finishedEvent = finishedEvent
logger.info("Starting %s running every %.3f [s]", name, self._period)

def run(self):
ev = self._finishedEvent
ev = self._dlm._finishedEvent
dlm = self._dlm
p = self._period
while True:
Expand Down Expand Up @@ -204,47 +204,58 @@ def handleEvent(self, event):
self._dlm.handleCompletedDrop(event.uid)


class DataLifecycleManager(object):
def __init__(self, **kwargs):
self._hsm = manager.HierarchicalStorageManager()
class DataLifecycleManager:
"""
An object that deals with automatic data drop replication and deletion.
"""

def __init__(self, check_period=0, cleanup_period=0, enable_drop_replication=False):
self._reg = registry.InMemoryRegistry()
self._listener = DropEventListener(self)
self._enable_drop_replication = enable_drop_replication
if enable_drop_replication:
self._hsm = manager.HierarchicalStorageManager()
else:
self._hsm = None

# TODO: When iteration over the values of _drops we always do _drops.values()
# instead of _drops.itervalues() to get a full, thread-safe copy of the
# dictionary values. Maybe there's a better approach for thread-safety
# here
self._drops = {}

self._checkPeriod = 10
if "checkPeriod" in kwargs:
self._checkPeriod = float(kwargs["checkPeriod"])

self._cleanupPeriod = 10 * self._checkPeriod
if "cleanupPeriod" in kwargs:
self._cleanupPeriod = float(kwargs["cleanupPeriod"])
self._check_period = check_period
self._cleanup_period = cleanup_period
self._drop_checker = None
self._drop_garbage_collector = None
self._finishedEvent = threading.Event()

def startup(self):
# Spawn the background threads
finishedEvent = threading.Event()
dropChecker = DROPChecker(self, self._checkPeriod, finishedEvent)
dropChecker.start()
dropGarbageCollector = DROPGarbageCollector(
self, self._cleanupPeriod, finishedEvent
)
dropGarbageCollector.start()

self._dropChecker = dropChecker
self._dropGarbageCollector = dropGarbageCollector
self._finishedEvent = finishedEvent
if self._check_period:
self._drop_checker = DROPChecker(
"DropChecker",
self,
self._check_period
)
self._drop_checker.start()
if self._cleanup_period:
self._drop_garbage_collector = DROPGarbageCollector(
"DropGarbageCollector",
self,
self._cleanup_period
)
self._drop_garbage_collector.start()

def cleanup(self):
logger.info("Cleaning up the DLM")

# Join the background threads
self._finishedEvent.set()
self._dropChecker.join()
self._dropGarbageCollector.join()
if self._drop_checker:
self._drop_checker.join()
if self._drop_garbage_collector:
self._drop_garbage_collector.join()

# Unsubscribe to all events coming from the DROPs
for drop in self._drops.values():
Expand Down Expand Up @@ -281,10 +292,8 @@ def expireCompletedDrops(self):
# are finished using this DROP
if drop.expireAfterUse:
allDone = all(
[
c.execStatus in [AppDROPStates.FINISHED, AppDROPStates.ERROR]
for c in drop.consumers
]
c.execStatus in [AppDROPStates.FINISHED, AppDROPStates.ERROR]
for c in drop.consumers
)
if not allDone:
continue
Expand Down Expand Up @@ -459,6 +468,9 @@ def handleCompletedDrop(self, uid):
# Check the kind of storage used by this DROP. If it's already persisted
# in a persistent storage media we don't need to save it again

if not self._enable_drop_replication:
return

drop = self._drops[uid]
if drop.precious and self.isReplicable(drop):
logger.debug("Replicating %r because it's precious", drop)
Expand Down
31 changes: 28 additions & 3 deletions daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,24 @@ def dlgNM(parser, args):
"--no-dlm",
action="store_true",
dest="noDLM",
help="Don't start the Data Lifecycle Manager on this NodeManager",
default=True,
help="(DEPRECATED) Don't start the Data Lifecycle Manager on this NodeManager",
)
parser.add_option(
"--dlm-check-period",
type="float",
help="Time in seconds between background DLM drop status checks (defaults to 10)",
default=10
)
parser.add_option(
"--dlm-cleanup-period",
type="float",
help="Time in seconds between background DLM drop automatic cleanups (defaults to 100)",
default=100
)
parser.add_option(
"--dlm-enable-replication",
action="store_true",
help="Turn on data drop automatic replication (off by default)",
)
parser.add_option(
"--dlg-path",
Expand Down Expand Up @@ -388,13 +404,22 @@ def dlgNM(parser, args):
)
(options, args) = parser.parse_args(args)

# No logging setup at this point yet
if options.noDLM:
print("WARNING: --no-dlm is deprecated, use the --dlm-* options instead")
options.dlm_check_period = 0
options.dlm_cleanup_period = 0
options.dlm_enable_replication = False

# Add DM-specific options
# Note that the host we use to expose the NodeManager itself through Pyro is
# also used to expose the Sessions it creates
options.dmType = NodeManager
options.dmArgs = ()
options.dmKwargs = {
"useDLM": not options.noDLM,
"dlm_check_period": options.dlm_check_period,
"dlm_cleanup_period": options.dlm_cleanup_period,
"dlm_enable_replication": options.dlm_enable_replication,
"dlgPath": options.dlgPath,
"host": options.host,
"error_listener": options.errorListener,
Expand Down
67 changes: 24 additions & 43 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,21 @@ class NodeManagerBase(DROPManager):

def __init__(
self,
useDLM=False,
dlm_check_period=0,
dlm_cleanup_period=0,
dlm_enable_replication=False,
dlgPath=None,
error_listener=None,
event_listeners=[],
max_threads=0,
logdir=utils.getDlgLogsDir(),
):

self._dlm = DataLifecycleManager() if useDLM else None
self._dlm = DataLifecycleManager(
check_period=dlm_check_period,
cleanup_period=dlm_cleanup_period,
enable_drop_replication=dlm_enable_replication
)
self._sessions = {}
self.logdir = logdir

Expand Down Expand Up @@ -180,38 +186,16 @@ def __init__(
debugging = logger.isEnabledFor(logging.DEBUG)
self._logging_event_listener = LogEvtListener() if debugging else None

# Start the mix-ins
self.start()

@abc.abstractmethod
def start(self):
"""
Starts any background task required by this Node Manager
"""
super().start()
self._dlm.startup()

def shutdown(self):
self._dlm.cleanup()
if self._threadpool:
self._threadpool.close()
self._threadpool.join()

@abc.abstractmethod
def subscribe(self, host, port):
"""
Subscribes this Node Manager to events published in from ``host``:``port``
"""

@abc.abstractmethod
def publish_event(self, evt):
"""
Publishes the event ``evt`` for other Node Managers to receive it
"""

@abc.abstractmethod
def get_rpc_client(self, hostname, port):
"""
Creates an RPC client connected to the node manager running in
``host``:``port``, and its closing method, as a 2-tuple.
"""
super().shutdown()

def deliver_event(self, evt):
"""
Expand Down Expand Up @@ -286,8 +270,7 @@ def foreach(drop):
)
drop._sessID = sessionId
self._memoryManager.register_drop(drop.uid, sessionId)
if self._dlm:
self._dlm.addDrop(drop)
self._dlm.addDrop(drop)

# Remote event forwarding
evt_listener = NMDropEventListener(self, sessionId)
Expand Down Expand Up @@ -561,28 +544,26 @@ class RpcMixIn(rpc.RPCClient, rpc.RPCServer):


# Final NodeManager class
class NodeManager(EventMixIn, RpcMixIn, NodeManagerBase):
class NodeManager(NodeManagerBase, EventMixIn, RpcMixIn):
def __init__(
self,
useDLM=True,
dlgPath=utils.getDlgPath(),
error_listener=None,
event_listeners=[],
max_threads=0,
logdir=utils.getDlgLogsDir(),
host=None,
rpc_port=constants.NODE_DEFAULT_RPC_PORT,
events_port=constants.NODE_DEFAULT_EVENTS_PORT,
*args,
**kwargs
):
# We "just know" that our RpcMixIn will have a create_context static
# method, which in reality means we are using the ZeroRPCServer class
self._context = RpcMixIn.create_context()
host = host or "127.0.0.1"
NodeManagerBase.__init__(self, *args, **kwargs)
EventMixIn.__init__(self, host, events_port)
RpcMixIn.__init__(self, host, rpc_port)
NodeManagerBase.__init__(
self, useDLM, dlgPath, error_listener, event_listeners, max_threads, logdir
)
self.start()

def start(self):
# We "just know" that our RpcMixIn will have a create_context static
# method, which in reality means we are using the ZeroRPCServer class
self._context = RpcMixIn.create_context()
super().start()

def shutdown(self):
super(NodeManager, self).shutdown()
Expand Down
11 changes: 8 additions & 3 deletions daliuge-engine/dlg/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,20 @@ class ZeroRPCClient(RPCClientBase):

def __init__(self, *args, **kwargs):
super(ZeroRPCClient, self).__init__(*args, **kwargs)
if not hasattr(self, "_context"):
self._context = zerorpc.Context()
self._zrpcclients = {}
self._zrpcclientthreads = []
self._own_context = False
logger.debug("RPC Client created")

def __del__(self):
if self._context:
if self._own_context and self._context:
self._context.term()

def start(self):
super(ZeroRPCClient, self).start()
if not hasattr(self, "_context"):
self._context = zerorpc.Context()
self._own_context = True

# One per remote host
self._zrpcclient_acquisition_lock = threading.Lock()
Expand All @@ -128,6 +130,9 @@ def shutdown(self):
t.join(10)
if t.is_alive():
logger.warning("ZeroRPC client thread %s is still alive", t.name)
if self._own_context:
self._context.term()
self._context = None

def get_client_for_endpoint(self, host, port):

Expand Down
7 changes: 0 additions & 7 deletions daliuge-engine/dlg/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,3 @@ def start_mm_in_thread(
return self._start_manager_in_thread(
port, MasterManager, CompositeManagerRestServer, nm_hosts
)

def start_mm_in_thread(
self, nm_hosts=["127.0.0.1"], port=constants.MASTER_DEFAULT_REST_PORT
):
return self._start_manager_in_thread(
port, MasterManager, CompositeManagerRestServer, nm_hosts
)
Empty file removed daliuge-engine/pickle
Empty file.

0 comments on commit 1b8c85f

Please sign in to comment.