Skip to content

Commit

Permalink
Merge pull request #92 from ICRAR/yan-854
Browse files Browse the repository at this point in the history
Yan 854 -- Shared Memory DROPs
  • Loading branch information
awicenec committed Jan 13, 2022
2 parents 2b395b9 + 8306f05 commit c27a821
Show file tree
Hide file tree
Showing 11 changed files with 869 additions and 29 deletions.
2 changes: 2 additions & 0 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Categories:
END = "End"

MEMORY = "Memory"
SHMEM = "SharedMemory"
FILE = "File"
NGAS = "NGAS"
NULL = "null"
Expand Down Expand Up @@ -63,6 +64,7 @@ class Categories:

STORAGE_TYPES = {
Categories.MEMORY,
Categories.SHMEM,
Categories.FILE,
Categories.NGAS,
Categories.NULL,
Expand Down
7 changes: 5 additions & 2 deletions daliuge-engine/dlg/apps/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
BarrierAppDROP,
NgasDROP,
InMemoryDROP,
SharedMemoryDROP,
NullDROP,
RDBMSDrop,
ContainerDROP,
Expand Down Expand Up @@ -78,12 +79,14 @@ class ScpApp(BarrierAppDROP):
"input onto its single output via SSHs scp protocol.",
[
dlg_batch_input(
"binary/*", [NgasDROP, InMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
"binary/*",
[NgasDROP, InMemoryDROP, SharedMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
)
],
[
dlg_batch_output(
"binary/*", [NgasDROP, InMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
"binary/*",
[NgasDROP, InMemoryDROP, SharedMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
)
],
[dlg_streaming_input("binary/*")],
Expand Down
39 changes: 38 additions & 1 deletion daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""
Module containing the core DROP classes.
"""

import string
from abc import ABCMeta, abstractmethod
import ast
import base64
Expand Down Expand Up @@ -1452,6 +1452,42 @@ def dataURL(self):
return "mem://%s/%d/%d" % (hostname, os.getpid(), id(self._buf))


class SharedMemoryDROP(AbstractDROP):
"""
A DROP that points to data stored in shared memory.
This drop is functionality equivalent to an InMemory drop running in a concurrent environment.
In this case however, the requirement for shared memory is explicit.
@WARNING Currently implemented as writing to shmem and there is no backup behaviour.
"""

def initialize(self, **kwargs):
args = []
if "pydata" in kwargs:
pydata = kwargs.pop("pydata")
if isinstance(pydata, str):
pydata = pydata.encode("utf8")
args.append(base64.b64decode(pydata))
self._buf = io.BytesIO(*args)

def getIO(self):
print(sys.version_info)
if sys.version_info >= (3, 8):
if hasattr(self, '_sessID'):
return SharedMemoryIO(self.oid, self._sessID)
else:
# Using Drop without manager, just generate a random name.
sess_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
return SharedMemoryIO(self.oid, sess_id)
else:
raise NotImplementedError("Shared memory is only available with Python >= 3.8")

@property
def dataURL(self):
hostname = os.uname()[1]
return f"shmem://{hostname}/{os.getpid()}/{id(self._buf)}"


class NullDROP(AbstractDROP):
"""
A DROP that doesn't store any data.
Expand Down Expand Up @@ -2146,6 +2182,7 @@ def getIO(self):
def dataURL(self):
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


##
# @brief ParameterSet
# @details A set of parameters, wholly specified in EAGLE
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .drop import (
ContainerDROP,
InMemoryDROP,
SharedMemoryDROP,
FileDROP,
NgasDROP,
LINKTYPE_NTO1_PROPERTY,
Expand All @@ -51,6 +52,7 @@

STORAGE_TYPES = {
Categories.MEMORY: InMemoryDROP,
Categories.SHMEM: SharedMemoryDROP,
Categories.FILE: FileDROP,
Categories.NGAS: NgasDROP,
Categories.NULL: NullDROP,
Expand Down
58 changes: 33 additions & 25 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@
from . import constants
from .drop_manager import DROPManager
from .session import Session

if sys.version_info >= (3, 8):
from .shared_memory_manager import DlgSharedMemoryManager
from .. import rpc, utils
from ..ddap_protocol import DROPStates
from ..drop import AppDROP, InMemoryDROP
from ..drop import AppDROP, InMemoryDROP, SharedMemoryDROP
from ..exceptions import (
NoSessionException,
SessionAlreadyExistsException,
DaliugeException,
)
from ..lifecycle.dlm import DataLifecycleManager


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -127,13 +127,13 @@ class NodeManagerBase(DROPManager):
__metaclass__ = abc.ABCMeta

def __init__(
self,
useDLM=False,
dlgPath=None,
error_listener=None,
event_listeners=[],
max_threads=0,
logdir=utils.getDlgLogsDir(),
self,
useDLM=False,
dlgPath=None,
error_listener=None,
event_listeners=[],
max_threads=0,
logdir=utils.getDlgLogsDir(),
):

self._dlm = DataLifecycleManager() if useDLM else None
Expand Down Expand Up @@ -165,15 +165,15 @@ def __init__(

# Start thread pool
self._threadpool = None
if max_threads == -1: # default use all CPU cores
if max_threads == -1: # default use all CPU cores
max_threads = cpu_count(logical=False)
else: # never more than 200
else: # never more than 200
max_threads = max(min(max_threads, 200), 1)
if max_threads > 1 and sys.version_info >= (3, 8):
logger.info("Initializing thread pool with %d threads", max_threads)
self._threadpool = multiprocessing.pool.ThreadPool(processes=max_threads)
if sys.version_info >= (3, 8):
self._memoryManager = DlgSharedMemoryManager()

if max_threads > 1:
logger.info("Initializing thread pool with %d threads", max_threads)
self._threadpool = multiprocessing.pool.ThreadPool(processes=max_threads)

# Event handler that only logs status changes
debugging = logger.isEnabledFor(logging.DEBUG)
Expand Down Expand Up @@ -270,6 +270,12 @@ def foreach(drop):
if isinstance(drop, InMemoryDROP):
drop._sessID = sessionId
self._memoryManager.register_drop(drop.uid, sessionId)
elif isinstance(drop, SharedMemoryDROP):
if sys.version_info < (3, 8):
raise NotImplementedError(
"Shared memory is not implemented when using Python < 3.8")
drop._sessID = sessionId
self._memoryManager.register_drop(drop.uid, sessionId)
if self._dlm:
self._dlm.addDrop(drop)

Expand Down Expand Up @@ -541,6 +547,8 @@ def _receive_events(self, sock_created):

# So far we currently support ZMQ only for event publishing
EventMixIn = ZMQPubSubMixIn


# Load the corresponding RPC classes and finish the construciton of NodeManager
class RpcMixIn(rpc.RPCClient, rpc.RPCServer):
pass
Expand All @@ -549,16 +557,16 @@ class RpcMixIn(rpc.RPCClient, rpc.RPCServer):
# Final NodeManager class
class NodeManager(EventMixIn, RpcMixIn, NodeManagerBase):
def __init__(
self,
useDLM=True,
dlgPath=utils.getDlgPath(),
error_listener=None,
event_listeners=[],
max_threads=0,
logdir=utils.getDlgLogsDir(),
host=None,
rpc_port=constants.NODE_DEFAULT_RPC_PORT,
events_port=constants.NODE_DEFAULT_EVENTS_PORT,
self,
useDLM=True,
dlgPath=utils.getDlgPath(),
error_listener=None,
event_listeners=[],
max_threads=0,
logdir=utils.getDlgLogsDir(),
host=None,
rpc_port=constants.NODE_DEFAULT_RPC_PORT,
events_port=constants.NODE_DEFAULT_EVENTS_PORT,
):
# We "just know" that our RpcMixIn will have a create_context static
# method, which in reality means we are using the ZeroRPCServer class
Expand Down
18 changes: 18 additions & 0 deletions daliuge-engine/test/manager/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
import copy
import os
import sys
import threading
import unittest
from time import sleep
Expand Down Expand Up @@ -611,6 +612,23 @@ def test_run_streaming_consumer_remotely2(self):
e_data = str(crc32c(a_data, 0)).encode('utf8')
self._test_runGraphInTwoNMs(g1, g2, rels, a_data, e_data, leaf_oid="E")

def test_run_invalid_shmem_graph(self):
"""
Our shared memory implementation does not support Python < 3.7
This test asserts that a graph containing shared memory drops will not run if running
in python < 3.8, and that it *does* run with python >= 3.8
"""

graph = [{"oid": "A", "type": "plain", "storage": Categories.SHMEM}]
dm = self._start_dm()
sessionID = "s1"
if sys.version_info < (3, 8):
self.assertRaises(NotImplementedError, quickDeploy, dm, sessionID, graph)
else:
quickDeploy(dm, sessionID, graph)
self.assertEqual(1, len(dm._sessions[sessionID].drops))
dm.destroySession(sessionID)


@unittest.skipIf(multiprocessing.cpu_count() < 4, "Not enough threads to test multiprocessing")
class TestDMParallel(NMTestsMixIn, unittest.TestCase):
Expand Down
16 changes: 16 additions & 0 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import shutil
import sqlite3
import string
import sys
import tempfile
import subprocess

Expand All @@ -36,6 +37,7 @@
FileDROP,
AppDROP,
InMemoryDROP,
SharedMemoryDROP,
PlasmaDROP,
PlasmaFlightDROP,
NullDROP,
Expand Down Expand Up @@ -131,6 +133,20 @@ def test_dynamic_write_InMemoryDROP(self):
"""
self._test_dynamic_write_withDropType(InMemoryDROP)

@unittest.skipIf(sys.version_info < (3, 8), "Shared memory does nt work < python 3.8")
def test_write_SharedMemoryDROP(self):
"""
Test a SharedMemoryDROP with simple AppDROP (for checksum calculation)
"""
self._test_write_withDropType(SharedMemoryDROP)

@unittest.skipIf(sys.version_info < (3, 8), "Shared memory does nt work < python 3.8")
def test_dynamic_write_SharedMemoryDROP(self):
"""
Test a SharedMemoryDROP with simple AppDROP (for checksum calculation)
"""
self._test_dynamic_write_withDropType(SharedMemoryDROP)

def test_write_plasmaDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
Expand Down
9 changes: 8 additions & 1 deletion daliuge-engine/test/test_graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from dlg import graph_loader
from dlg.ddap_protocol import DROPLinkType, DROPRel
from dlg.drop import InMemoryDROP, ContainerDROP, AppDROP, DirectoryContainer
from dlg.drop import InMemoryDROP, SharedMemoryDROP, ContainerDROP, AppDROP, DirectoryContainer
from dlg.common import Categories


Expand All @@ -40,6 +40,13 @@ def test_singleMemoryDrop(self):
self.assertEqual("A", a.oid)
self.assertEqual("A", a.uid)

def test_sharedMemoryDrop(self):
dropSpecList = [{"oid": "A", "type": "plain", "storage": Categories.SHMEM}]
a = graph_loader.createGraphFromDropSpecList(dropSpecList)[0]
self.assertIsInstance(a, SharedMemoryDROP)
self.assertEqual("A", a.oid)
self.assertEqual("A", a.uid)

def test_containerDrop(self):
dropSpecList = [
{"oid": "A", "type": "plain", "storage": Categories.MEMORY},
Expand Down
Loading

0 comments on commit c27a821

Please sign in to comment.