Skip to content

Commit

Permalink
Merge branch 'master' into YAN-858
Browse files Browse the repository at this point in the history
Conflicts:
	daliuge-engine/dlg/apps/simple.py
  • Loading branch information
calgray committed Jan 14, 2022
2 parents 15d9131 + cc15b7d commit 0595a7b
Show file tree
Hide file tree
Showing 36 changed files with 1,192 additions and 139 deletions.
4 changes: 4 additions & 0 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ class Categories:
END = "End"

MEMORY = "Memory"
SHMEM = "SharedMemory"
FILE = "File"
NGAS = "NGAS"
NULL = "null"
JSON = "json"
S3 = "S3"
PLASMA = "Plasma"
PLASMAFLIGHT = "PlasmaFlight"
PARSET = "ParameterSet"

MKN = "MKN"
SCATTER = "Scatter"
Expand All @@ -62,13 +64,15 @@ class Categories:

STORAGE_TYPES = {
Categories.MEMORY,
Categories.SHMEM,
Categories.FILE,
Categories.NGAS,
Categories.NULL,
Categories.END,
Categories.JSON,
Categories.PLASMA,
Categories.PLASMAFLIGHT,
Categories.PARSET
}
APP_DROP_TYPES = [
Categories.COMPONENT,
Expand Down
Empty file added daliuge-engine/__init__.py
Empty file.
7 changes: 5 additions & 2 deletions daliuge-engine/dlg/apps/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
BarrierAppDROP,
NgasDROP,
InMemoryDROP,
SharedMemoryDROP,
NullDROP,
RDBMSDrop,
ContainerDROP,
Expand Down Expand Up @@ -78,12 +79,14 @@ class ScpApp(BarrierAppDROP):
"input onto its single output via SSHs scp protocol.",
[
dlg_batch_input(
"binary/*", [NgasDROP, InMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
"binary/*",
[NgasDROP, InMemoryDROP, SharedMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
)
],
[
dlg_batch_output(
"binary/*", [NgasDROP, InMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
"binary/*",
[NgasDROP, InMemoryDROP, SharedMemoryDROP, NullDROP, RDBMSDrop, ContainerDROP]
)
],
[dlg_streaming_input("binary/*")],
Expand Down
14 changes: 10 additions & 4 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@

from dlg import droputils, utils
from dlg.drop import BarrierAppDROP, BranchAppDrop, ContainerDROP
from dlg.meta import dlg_float_param, dlg_string_param
from dlg.meta import dlg_bool_param, dlg_int_param
from dlg.meta import dlg_component, dlg_batch_input
from dlg.meta import dlg_batch_output, dlg_streaming_input
from dlg.meta import (
dlg_float_param,
dlg_string_param,
dlg_bool_param,
dlg_int_param,
dlg_component,
dlg_batch_input,
dlg_batch_output,
dlg_streaming_input
)
from dlg.exceptions import DaliugeException
from dlg.apps.pyfunc import serialize_data, deserialize_data

Expand Down
116 changes: 110 additions & 6 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""
Module containing the core DROP classes.
"""

import string
from abc import ABCMeta, abstractmethod
import ast
import base64
Expand Down Expand Up @@ -70,6 +70,10 @@
PlasmaIO,
PlasmaFlightIO,
)

DEFAULT_INTERNAL_PARAMETERS = {'storage', 'rank', 'loop_cxt', 'dw', 'iid', 'dt', 'consumers',
'config_data', 'mode'}

if sys.version_info >= (3, 8):
from .io import SharedMemoryIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
Expand Down Expand Up @@ -98,7 +102,6 @@

_checksumType = ChecksumTypes.CRC_32


logger = logging.getLogger(__name__)


Expand All @@ -115,6 +118,7 @@ def append(self, drop):

track_current_drop = object_tracking("drop")


# ===============================================================================
# DROP classes follow
# ===============================================================================
Expand Down Expand Up @@ -183,7 +187,6 @@ def __init__(self, oid, uid, **kwargs):

# Copy it since we're going to modify it
kwargs = dict(kwargs)

# So far only these three are mandatory
self._oid = str(oid)
self._uid = str(uid)
Expand Down Expand Up @@ -331,6 +334,9 @@ def __init__(self, oid, uid, **kwargs):
# All DROPs are precious unless stated otherwise; used for replication
self._precious = self._getArg(kwargs, "precious", True)

# Useful to have access to all EAGLE parameters without a priori knowledge
self._parameters = dict(kwargs)

# Sub-class initialization; mark ourselves as INITIALIZED after that
self.initialize(**kwargs)
self._status = (
Expand All @@ -346,7 +352,7 @@ def getmembers(object, predicate=None):

# Take a class dlg defined parameter class attribute and create an instanced attribute on object
for attr_name, obj in getmembers(
self, lambda a: not (inspect.isfunction(a) or isinstance(a, property))
self, lambda a: not (inspect.isfunction(a) or isinstance(a, property))
):
if isinstance(obj, dlg_float_param):
value = kwargs.get(attr_name, obj.default_value)
Expand Down Expand Up @@ -1125,6 +1131,10 @@ def node(self):
def dataIsland(self):
return self._dataIsland

@property
def parameters(self):
return self._parameters


class PathBasedDrop(object):
"""Base class for data drops that handle paths (i.e., file and directory drops)"""
Expand Down Expand Up @@ -1444,6 +1454,42 @@ def dataURL(self):
return "mem://%s/%d/%d" % (hostname, os.getpid(), id(self._buf))


class SharedMemoryDROP(AbstractDROP):
"""
A DROP that points to data stored in shared memory.
This drop is functionality equivalent to an InMemory drop running in a concurrent environment.
In this case however, the requirement for shared memory is explicit.
@WARNING Currently implemented as writing to shmem and there is no backup behaviour.
"""

def initialize(self, **kwargs):
args = []
if "pydata" in kwargs:
pydata = kwargs.pop("pydata")
if isinstance(pydata, str):
pydata = pydata.encode("utf8")
args.append(base64.b64decode(pydata))
self._buf = io.BytesIO(*args)

def getIO(self):
print(sys.version_info)
if sys.version_info >= (3, 8):
if hasattr(self, '_sessID'):
return SharedMemoryIO(self.oid, self._sessID)
else:
# Using Drop without manager, just generate a random name.
sess_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
return SharedMemoryIO(self.oid, sess_id)
else:
raise NotImplementedError("Shared memory is only available with Python >= 3.8")

@property
def dataURL(self):
hostname = os.uname()[1]
return f"shmem://{hostname}/{os.getpid()}/{id(self._buf)}"


class NullDROP(AbstractDROP):
"""
A DROP that doesn't store any data.
Expand Down Expand Up @@ -1504,7 +1550,6 @@ def insert(self, vals):
"""
with self._connection() as c:
with self._cursor(c) as cur:

# vals is a dictionary, its keys are the column names and its
# values are the values to insert
sql = "INSERT into %s (%s) VALUES (%s)" % (
Expand Down Expand Up @@ -1675,7 +1720,6 @@ def exists(self):


class AppDROP(ContainerDROP):

"""
An AppDROP is a DROP representing an application that reads data
from one or more DROPs (its inputs), and writes data onto one or more
Expand Down Expand Up @@ -2145,6 +2189,66 @@ def dataURL(self):
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


##
# @brief ParameterSet
# @details A set of parameters, wholly specified in EAGLE
# @par EAGLE_START
# @param category ParameterSet
# @param[in] param/mode Parset mode/"YANDA"/String/readonly/False/To what standard DALiuGE should filter and serialize the parameters.
# @param[in] param/config_data ConfigData/""/String/readwrite/False/Additional configuration information to be mixed in with the initial data
# @param[out] port/Config ConfigFile/File/The output configuration file
# @par EAGLE_END
class ParameterSetDROP(AbstractDROP):
"""
A generic configuration file template wrapper
This drop opens an (optional) file containing some initial configuration information, then
appends any additional specified parameters to it, finally serving it as a data object.
"""

config_data = b''

mode = dlg_string_param('mode', None)

@abstractmethod
def serialize_parameters(self, parameters: dict, mode):
"""
Returns a string representing a serialization of the parameters.
"""
if mode == "YANDA":
# TODO: Add more complex value checking
return "\n".join(f"{x}={y}" for x, y in parameters.items())
# Add more formats (.ini for example)
return "\n".join(f"{x}={y}" for x, y in parameters.items())

@abstractmethod
def filter_parameters(self, parameters: dict, mode):
"""
Returns a dictionary of parameters, with daliuge-internal or other parameters filtered out
"""
if mode == 'YANDA':
forbidden_params = list(DEFAULT_INTERNAL_PARAMETERS)
if parameters['config_data'] == "":
forbidden_params.append('configData')
return {key: val for key, val in parameters.items() if
key not in DEFAULT_INTERNAL_PARAMETERS}
return parameters

def initialize(self, **kwargs):
"""
TODO: Open input file
"""
self.config_data = self.serialize_parameters(
self.filter_parameters(self.parameters, self.mode), self.mode).encode('utf-8')

def getIO(self):
return MemoryIO(io.BytesIO(self.config_data))

@property
def dataURL(self):
hostname = os.uname()[1]
return f"config://{hostname}/{os.getpid()}/{id(self.config_data)}"


# Dictionary mapping 1-to-many DROPLinkType constants to the corresponding methods
# used to append a a DROP into a relationship collection of another
# (e.g., one uses `addConsumer` to add a DROPLinkeType.CONSUMER DROP into
Expand Down
4 changes: 4 additions & 0 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .drop import (
ContainerDROP,
InMemoryDROP,
SharedMemoryDROP,
FileDROP,
NgasDROP,
LINKTYPE_NTO1_PROPERTY,
Expand All @@ -42,6 +43,7 @@
EndDROP,
PlasmaDROP,
PlasmaFlightDROP,
ParameterSetDROP
)
from .exceptions import InvalidGraphException
from .json_drop import JsonDROP
Expand All @@ -50,13 +52,15 @@

STORAGE_TYPES = {
Categories.MEMORY: InMemoryDROP,
Categories.SHMEM: SharedMemoryDROP,
Categories.FILE: FileDROP,
Categories.NGAS: NgasDROP,
Categories.NULL: NullDROP,
Categories.END: EndDROP,
Categories.JSON: JsonDROP,
Categories.PLASMA: PlasmaDROP,
Categories.PLASMAFLIGHT: PlasmaFlightDROP,
Categories.PARSET: ParameterSetDROP
}

try:
Expand Down
Loading

0 comments on commit 0595a7b

Please sign in to comment.