Skip to content

Commit

Permalink
Merge 1444624 into de9d17f
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed Dec 21, 2021
2 parents de9d17f + 1444624 commit 42da084
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 5 deletions.
2 changes: 2 additions & 0 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Categories:
S3 = "S3"
PLASMA = "Plasma"
PLASMAFLIGHT = "PlasmaFlight"
PARSET = "ParameterSet"

MKN = "MKN"
SCATTER = "Scatter"
Expand Down Expand Up @@ -69,6 +70,7 @@ class Categories:
Categories.JSON,
Categories.PLASMA,
Categories.PLASMAFLIGHT,
Categories.PARSET
}
APP_DROP_TYPES = [
Categories.COMPONENT,
Expand Down
Empty file added daliuge-engine/__init__.py
Empty file.
77 changes: 72 additions & 5 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
PlasmaIO,
PlasmaFlightIO,
)

DEFAULT_INTERNAL_PARAMETERS = {'storage', 'rank', 'loop_cxt', 'dw', 'iid', 'dt', 'consumers',
'config_data', 'mode'}

if sys.version_info >= (3, 8):
from .io import SharedMemoryIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
Expand Down Expand Up @@ -96,7 +100,6 @@

_checksumType = ChecksumTypes.CRC_32


logger = logging.getLogger(__name__)


Expand All @@ -113,6 +116,7 @@ def append(self, drop):

track_current_drop = object_tracking("drop")


# ===============================================================================
# DROP classes follow
# ===============================================================================
Expand Down Expand Up @@ -181,7 +185,6 @@ def __init__(self, oid, uid, **kwargs):

# Copy it since we're going to modify it
kwargs = dict(kwargs)

# So far only these three are mandatory
self._oid = str(oid)
self._uid = str(uid)
Expand Down Expand Up @@ -329,6 +332,9 @@ def __init__(self, oid, uid, **kwargs):
# All DROPs are precious unless stated otherwise; used for replication
self._precious = self._getArg(kwargs, "precious", True)

# Useful to have access to all EAGLE parameters without a priori knowledge
self._parameters = dict(kwargs)

# Sub-class initialization; mark ourselves as INITIALIZED after that
self.initialize(**kwargs)
self._status = (
Expand All @@ -344,7 +350,7 @@ def getmembers(object, predicate=None):

# Take a class dlg defined parameter class attribute and create an instanced attribute on object
for attr_name, obj in getmembers(
self, lambda a: not (inspect.isfunction(a) or isinstance(a, property))
self, lambda a: not (inspect.isfunction(a) or isinstance(a, property))
):
if isinstance(obj, dlg_float_param):
value = kwargs.get(attr_name, obj.default_value)
Expand Down Expand Up @@ -1123,6 +1129,10 @@ def node(self):
def dataIsland(self):
return self._dataIsland

@property
def parameters(self):
return self._parameters


class PathBasedDrop(object):
"""Base class for data drops that handle paths (i.e., file and directory drops)"""
Expand Down Expand Up @@ -1502,7 +1512,6 @@ def insert(self, vals):
"""
with self._connection() as c:
with self._cursor(c) as cur:

# vals is a dictionary, its keys are the column names and its
# values are the values to insert
sql = "INSERT into %s (%s) VALUES (%s)" % (
Expand Down Expand Up @@ -1673,7 +1682,6 @@ def exists(self):


class AppDROP(ContainerDROP):

"""
An AppDROP is a DROP representing an application that reads data
from one or more DROPs (its inputs), and writes data onto one or more
Expand Down Expand Up @@ -2138,6 +2146,65 @@ def getIO(self):
def dataURL(self):
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii"))

##
# @brief ParameterSet
# @details A set of parameters, wholly specified in EAGLE
# @par EAGLE_START
# @param category ParameterSet
# @param[in] param/mode Parset mode/"YANDA"/String/readonly/False/To what standard DALiuGE should filter and serialize the parameters.
# @param[in] param/config_data ConfigData/""/String/readwrite/False/Additional configuration information to be mixed in with the initial data
# @param[out] port/Config ConfigFile/File/The output configuration file
# @par EAGLE_END
class ParameterSetDROP(AbstractDROP):
"""
A generic configuration file template wrapper
This drop opens an (optional) file containing some initial configuration information, then
appends any additional specified parameters to it, finally serving it as a data object.
"""

config_data = b''

mode = dlg_string_param('mode', None)

@abstractmethod
def serialize_parameters(self, parameters: dict, mode):
"""
Returns a string representing a serialization of the parameters.
"""
if mode == "YANDA":
# TODO: Add more complex value checking
return "\n".join(f"{x}={y}" for x, y in parameters.items())
# Add more formats (.ini for example)
return "\n".join(f"{x}={y}" for x, y in parameters.items())

@abstractmethod
def filter_parameters(self, parameters: dict, mode):
"""
Returns a dictionary of parameters, with daliuge-internal or other parameters filtered out
"""
if mode == 'YANDA':
forbidden_params = list(DEFAULT_INTERNAL_PARAMETERS)
if parameters['config_data'] == "":
forbidden_params.append('configData')
return {key: val for key, val in parameters.items() if
key not in DEFAULT_INTERNAL_PARAMETERS}
return parameters

def initialize(self, **kwargs):
"""
TODO: Open input file
"""
self.config_data = self.serialize_parameters(
self.filter_parameters(self.parameters, self.mode), self.mode).encode('utf-8')

def getIO(self):
return MemoryIO(io.BytesIO(self.config_data))

@property
def dataURL(self):
hostname = os.uname()[1]
return f"config://{hostname}/{os.getpid()}/{id(self.config_data)}"


# Dictionary mapping 1-to-many DROPLinkType constants to the corresponding methods
# used to append a a DROP into a relationship collection of another
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
EndDROP,
PlasmaDROP,
PlasmaFlightDROP,
ParameterSetDROP
)
from .exceptions import InvalidGraphException
from .json_drop import JsonDROP
Expand All @@ -57,6 +58,7 @@
Categories.JSON: JsonDROP,
Categories.PLASMA: PlasmaDROP,
Categories.PLASMAFLIGHT: PlasmaFlightDROP,
Categories.PARSET: ParameterSetDROP
}

try:
Expand Down
59 changes: 59 additions & 0 deletions daliuge-engine/test/test_ParameterSetDROP.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2014
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import unittest
from dlg.drop import ParameterSetDROP
from dlg.droputils import allDropContents


class test_ParameterSetDROP(unittest.TestCase):
kwargs = {
"mode": None,
"Cimager": 2,
"StringParam": "param",
"Boolparam": True,
"config_data": "",
"iid": -1,
"rank": 0,
"consumers": ["drop-1", "drop-2"],
}

def test_initialize(self):
yanda_kwargs = dict(self.kwargs)
yanda_kwargs["mode"] = "YANDA"

yanda_parset = ParameterSetDROP(oid="a", uid="a", **yanda_kwargs)
standard_parset = ParameterSetDROP(oid="b", uid="b", **self.kwargs)

yanda_output = "Cimager=2\nStringParam=param\nBoolparam=True"
standard_output = (
"mode=None\n"
+ yanda_output
+ "\nconfig_data=\niid=-1\nrank=0\nconsumers=['drop-1', 'drop-2']"
)

yanda_parset.setCompleted()
standard_parset.setCompleted()

self.assertEqual(yanda_output, allDropContents(yanda_parset).decode("utf-8"))
self.assertEqual(
standard_output, allDropContents(standard_parset).decode("utf-8")
)
2 changes: 2 additions & 0 deletions daliuge-translator/dlg/dropmake/web/lg_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@

logger = logging.getLogger(__name__)

# Patched to be larger to accomodate large config drops
bottle.BaseRequest.MEMFILE_MAX = 1024 * 512

def file_as_string(fname, enc="utf8"):
b = pkg_resources.resource_string(__name__, fname) # @UndefinedVariable
Expand Down

0 comments on commit 42da084

Please sign in to comment.