Skip to content

Commit

Permalink
Merge pull request #231 from ICRAR/liu-355
Browse files Browse the repository at this point in the history
The method has been split apart into 4 individual methods and the rest has then been integrated with the calling method ‘make_single_drop’. Much more clean now, but required a number of changes throughout the code to make all the tests and the graphs working again. 

Updated doxygen strings to get the new dropclass parameter for every builtin drop. ‘dropclass’ replaces all the individual *class (e.g. dataclass, appclass) parameters and made quite a bit of the code more straight forward. This had effects in many places around the code.

Updated categoryType and category to use both of them consistently throughout the code.

Changed to “name” everywhere. There had been a few places where this was not yet done.

Cleaned-up the construction of drop_spec, tried to use a dataclass, but that would have caused too many changes for little benefit.

adjusted all test graphs

adjusted all tests
  • Loading branch information
awicenec committed May 8, 2023
2 parents e48989c + 9186e10 commit 397111d
Show file tree
Hide file tree
Showing 91 changed files with 1,912 additions and 963 deletions.
4 changes: 2 additions & 2 deletions OpenAPI/tests/test.graph
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@
"defaultValue": "",
"description": "",
"keyAttribute": false,
"name": "appclass",
"name": "dropclass",
"options": [],
"positional": false,
"precious": false,
"readonly": false,
"text": "Appclass",
"text": "dropclass",
"type": "Unknown",
"value": "dlg.apps.simple.SleepAndCopyApp"
},
Expand Down
60 changes: 33 additions & 27 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,19 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from enum import Enum
from dataclasses import dataclass, field, asdict
import logging

"""Common utilities used by daliuge packages"""
from .osutils import terminate_or_kill, wait_or_kill
from .network import check_port, connect_to, portIsClosed, portIsOpen, write_to
from .streams import ZlibCompressedStream, JSONStream

logger = logging.getLogger(__name__)

class DropType:
"""
Class defining the LG keyword to be used to load the module defining the Drop.
"""

DATACLASS = "dataclass"
APPCLASS = "appclass"
SOCKETCLASS = "socket"
SERVICECLASS = "serviceapp" # App drop that runs continously
CONTAINERCLASS = "container" # Drop that contains other drops


class CategoryType:
class CategoryType(str, Enum):
DATA = "Data"
APPLICATION = "Application"
CONSTRUCT = "Construct"
Expand Down Expand Up @@ -78,35 +72,47 @@ class dropdict(dict):
DROPManager.
"""

def _addSomething(self, other, key, IdText=None):
def __init__(self, init_dict=None):
if init_dict is None:
init_dict = {
"oid": None,
"categoryType": "Unknown",
}

self.update(init_dict)
if "oid" not in self:
self.update({"oid": None})
return super().__init_subclass__()

def _addSomething(self, other, key, name=None):
if key not in self:
self[key] = []
if other["oid"] not in self[key]:
# TODO: Returning just the other drop OID instead of the named
# port list is not a good solution. Required for the dask
# tests.
append = {other["oid"]: IdText} if IdText else other["oid"]
# if IdText is None:
append = {other["oid"]: name} if name else other["oid"]
# if name is None:
# raise ValueError
self[key].append(append)

def addConsumer(self, other, IdText=None):
self._addSomething(other, "consumers", IdText=IdText)
def addConsumer(self, other, name=None):
self._addSomething(other, "consumers", name=name)

def addStreamingConsumer(self, other, IdText=None):
self._addSomething(other, "streamingConsumers", IdText=IdText)
def addStreamingConsumer(self, other, name=None):
self._addSomething(other, "streamingConsumers", name=name)

def addInput(self, other, IdText=None):
self._addSomething(other, "inputs", IdText=IdText)
def addInput(self, other, name=None):
self._addSomething(other, "inputs", name=name)

def addStreamingInput(self, other, IdText=None):
self._addSomething(other, "streamingInputs", IdText=IdText)
def addStreamingInput(self, other, name=None):
self._addSomething(other, "streamingInputs", name=name)

def addOutput(self, other, IdText=None):
self._addSomething(other, "outputs", IdText=IdText)
def addOutput(self, other, name=None):
self._addSomething(other, "outputs", name=name)

def addProducer(self, other, IdText=None):
self._addSomething(other, "producers", IdText=IdText)
def addProducer(self, other, name=None):
self._addSomething(other, "producers", name=name)


def _sanitize_links(links):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from enum import Enum

from dlg.common.reproducibility.constants import ReproducibilityFlags
from dlg.common import DropType
import logging

logger = logging.getLogger("__name__")
Expand Down Expand Up @@ -167,11 +166,11 @@ def lg_block_fields(
elif category_type == "Variables":
pass
elif category_type == "Branch":
data[DropType.APPCLASS] = FieldOps.STORE
data["dropclass"] = FieldOps.STORE
elif category_type == "PythonApp":
data[DropType.APPCLASS] = FieldOps.STORE
data["dropclass"] = FieldOps.STORE
elif category_type == "Component":
data[DropType.APPCLASS] = FieldOps.STORE
data["dropclass"] = FieldOps.STORE
elif category_type == "BashShellApp":
data["Arg01"] = FieldOps.STORE
elif category_type == "Mpi":
Expand Down
13 changes: 8 additions & 5 deletions daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class ThreadingWSGIServer(

class LoggingWSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
def log_message(self, fmt, *args):
logger.debug(fmt, *args)
pass
# logger.debug(fmt, *args)


class RestServerWSGIServer:
Expand Down Expand Up @@ -143,7 +144,9 @@ def _get_json(self, url):
def _post_form(self, url, content=None):
if content is not None:
content = urllib.parse.urlencode(content)
ret = self._POST(url, content, content_type="application/x-www-form-urlencoded")
ret = self._POST(
url, content, content_type="application/x-www-form-urlencoded"
)
return json.load(ret) if ret else None

def _post_json(self, url, content, compress=False):
Expand Down Expand Up @@ -177,10 +180,11 @@ def _DELETE(self, url):
return stream

def _request(self, url, method, content=None, headers={}):

# Do the HTTP stuff...
url = self.url_prefix + url
logger.debug("Sending %s request to %s:%d%s", method, self.host, self.port, url)
logger.debug(
"Sending %s request to %s:%d%s", method, self.host, self.port, url
)

if not common.portIsOpen(self.host, self.port, self.timeout):
raise RestClientException(
Expand All @@ -198,7 +202,6 @@ def _request(self, url, method, content=None, headers={}):

# Server errors are encoded in the body as json content
if self._resp.status != http.HTTPStatus.OK:

msg = "Error on remote %s@%s:%s%s (status %d): " % (
method,
self.host,
Expand Down
6 changes: 4 additions & 2 deletions daliuge-engine/dlg/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ def _notifyAppIsFinished(self):
else:
self.status = DROPStates.COMPLETED
logger.debug(
"Moving %r to %s", self, "FINISHED" if not is_error else "ERROR"
"Moving %r to %s",
self.oid,
"FINISHED" if not is_error else "ERROR",
)
self._fire(
"producerFinished", status=self.status, execStatus=self.execStatus
Expand Down Expand Up @@ -424,7 +426,7 @@ def execute(self, _send_notifications=True):
# applications, for the time being they follow their execState.

# Run at most self._n_tries if there are errors during the execution
logger.debug("Executing %r", self)
logger.debug("Executing %r", self.oid)
tries = 0
drop_state = DROPStates.COMPLETED
self.execStatus = AppDROPStates.RUNNING
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def store(self, inputDrop):
# @par EAGLE_START
# @param category PythonApp
# @param tag daliuge
# @param appclass Application Class/dlg.apps.archiving.NgasArchivingApp/String/ComponentParameter/readonly//False/False/Application class
# @param dropclass Application Class/dlg.apps.archiving.NgasArchivingApp/String/ComponentParameter/readonly//False/False/Application class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def execute(self, data):
# @param command_line_arguments Command Line Arguments//String/ComponentParameter/readwrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator Param value separator/ /String/ComponentParameter/readwrite//False/False/Separator character(s) between parameters on the command line
# @param argumentPrefix Argument prefix/"--"/String/ComponentParameter/readwrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dropclass/dlg.apps.bash_shell_app.BashShellApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# @par EAGLE_START
# @param category Branch
# @param tag template
# @param appclass Application Class/dlg.apps.simple.SimpleBranch/String/ComponentParameter/readonly//False/False/Application class
# @param dropclass Application Class/dlg.apps.simple.SimpleBranch/String/ComponentParameter/readonly//False/False/Application class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
9 changes: 9 additions & 0 deletions daliuge-engine/dlg/apps/constructs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# @param category Scatter
# @param tag template
# @param num_of_copies Scatter dimension/4/Integer/ComponentParameter/readwrite//False/False/Specifies the number of replications of the content of the scatter construct
# @param dropclass dropclass/dlg.apps.constructs.ScatterDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class ScatterDrop(BarrierAppDROP):
"""
Expand All @@ -25,6 +26,7 @@ class ScatterDrop(BarrierAppDROP):
# @param tag template
# @param num_of_inputs No. of inputs/2/Integer/ApplicationArgument/readwrite//False/False/Number of inputs
# @param gather_axis Index of gather axis/0/Integer/ApplicationArgument/readwrite//False/False/Index of gather axis
# @param dropclass dropclass/dlg.apps.constructs.GatherDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class GatherDrop(BarrierAppDROP):
"""
Expand All @@ -41,6 +43,7 @@ class GatherDrop(BarrierAppDROP):
# @param category Loop
# @param tag template
# @param num_of_iter No. of iterations/2/Integer/ApplicationArguments/readwrite//False/False/Number of iterations
# @param dropclass dropclass/dlg.apps.constructs.LoopDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class LoopDrop(BarrierAppDROP):
"""
Expand All @@ -57,6 +60,7 @@ class LoopDrop(BarrierAppDROP):
# @param category MKN
# @param tag template
# @param k K/1/Integer/ApplicationArgument/readwrite//False/False/Internal multiplicity
# @param dropclass dropclass/dlg.apps.constructs.MKNDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class MKNDrop(BarrierAppDROP):
"""
Expand All @@ -74,6 +78,7 @@ class MKNDrop(BarrierAppDROP):
# @param tag template
# @param num_of_inputs No. of inputs/2/Integer/ApplicationArgument/readwrite//False/False/Number of inputs
# @param gather_axis Index of gather axis/0/Integer/ApplicationArgument/readwrite//False/False/Index of gather axis
# @param dropclass dropclass/dlg.apps.constructs.GroupByDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class GroupByDrop(BarrierAppDROP):
"""
Expand All @@ -88,6 +93,7 @@ class GroupByDrop(BarrierAppDROP):
# @details A SubGraph template drop
# @par EAGLE_START
# @param category SubGraph
# @param dropclass dropclass/dlg.apps.constructs.SubGraphDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class SubGraphDrop(BarrierAppDROP):
Expand All @@ -103,6 +109,7 @@ class SubGraphDrop(BarrierAppDROP):
# @details A comment template drop
# @par EAGLE_START
# @param category Comment
# @param dropclass dropclass/dlg.apps.constructs.CommentDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class CommentDrop(BarrierAppDROP):
Expand All @@ -118,6 +125,7 @@ class CommentDrop(BarrierAppDROP):
# @details A description template drop
# @par EAGLE_START
# @param category Description
# @param dropclass dropclass/dlg.apps.constructs.DescriptionDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class DescriptionDrop(BarrierAppDROP):
Expand All @@ -133,6 +141,7 @@ class DescriptionDrop(BarrierAppDROP):
# @details An Exclusive Force Node
# @par EAGLE_START
# @param category ExclusiveForceNode
# @param dropclass dropclass/dlg.apps.constructs.ExclusiveForceDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class ExclusiveForceDrop(BarrierAppDROP):
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def run(self):
# @par EAGLE_START
# @param category PythonApp
# @param tag daliuge
# @param appclass Application Class/dlg.apps.crc.CRCStreamApp/String/ComponentParameter/readonly//False/False/Application class
# @param dropclass Application Class/dlg.apps.crc.CRCStreamApp/String/ComponentParameter/readonly//False/False/Application class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def waitForIp(self, timeout=None):
# @param command_line_arguments Command Line Arguments//String/ComponentParameter/readwrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator Param value separator/ /String/ComponentParameter/readwrite//False/False/Separator character(s) between parameters and their respective values on the command line
# @param argumentPrefix Argument prefix/"--"/String/ComponentParameter/readwrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dropclass/dlg.apps.dockerapp.DockerApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
3 changes: 1 addition & 2 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ def initialize(self, **kwargs):
self._c_outputs_setting_lock = threading.Lock()

def _ensure_c_outputs_are_set(self):

with self._c_outputs_setting_lock:
if self._c_outputs_set:
return
Expand Down Expand Up @@ -383,6 +382,7 @@ def generate_recompute_data(self):
# @param category DynlibApp
# @param tag template
# @param libpath Library Path//String/ComponentParameter/readwrite//False/False/The location of the shared object/DLL that implements this application
# @param dropclass dropclass/dlg.apps.dynlib.DynlibApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
Expand Down Expand Up @@ -497,7 +497,6 @@ def initialize(self, **kwargs):
self.proc = None

def run(self):

if not hasattr(self, "_rpc_server"):
raise Exception("DynlibProcApp can only run within an RPC server")

Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
# @param command_line_arguments Command Line Arguments//String/ComponentParameter/readwrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator Param value separator/ /String/ComponentParameter/readwrite//False/False/Separator character(s) between parameters on the command line
# @param argumentPrefix Argument prefix/"--"/String/ComponentParameter/readwrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dropclass/dlg.apps.mpi.MPIApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
Loading

0 comments on commit 397111d

Please sign in to comment.