Skip to content

Commit

Permalink
Merge 9186e10 into e48989c
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed May 8, 2023
2 parents e48989c + 9186e10 commit c79f28c
Show file tree
Hide file tree
Showing 91 changed files with 1,912 additions and 963 deletions.
4 changes: 2 additions & 2 deletions OpenAPI/tests/test.graph
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@
"defaultValue": "",
"description": "",
"keyAttribute": false,
"name": "appclass",
"name": "dropclass",
"options": [],
"positional": false,
"precious": false,
"readonly": false,
"text": "Appclass",
"text": "dropclass",
"type": "Unknown",
"value": "dlg.apps.simple.SleepAndCopyApp"
},
Expand Down
60 changes: 33 additions & 27 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,19 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from enum import Enum
from dataclasses import dataclass, field, asdict
import logging

"""Common utilities used by daliuge packages"""
from .osutils import terminate_or_kill, wait_or_kill
from .network import check_port, connect_to, portIsClosed, portIsOpen, write_to
from .streams import ZlibCompressedStream, JSONStream

logger = logging.getLogger(__name__)

class DropType:
"""
Class defining the LG keyword to be used to load the module defining the Drop.
"""

DATACLASS = "dataclass"
APPCLASS = "appclass"
SOCKETCLASS = "socket"
SERVICECLASS = "serviceapp" # App drop that runs continously
CONTAINERCLASS = "container" # Drop that contains other drops


class CategoryType:
class CategoryType(str, Enum):
DATA = "Data"
APPLICATION = "Application"
CONSTRUCT = "Construct"
Expand Down Expand Up @@ -78,35 +72,47 @@ class dropdict(dict):
DROPManager.
"""

def _addSomething(self, other, key, IdText=None):
def __init__(self, init_dict=None):
if init_dict is None:
init_dict = {
"oid": None,
"categoryType": "Unknown",
}

self.update(init_dict)
if "oid" not in self:
self.update({"oid": None})
return super().__init_subclass__()

def _addSomething(self, other, key, name=None):
if key not in self:
self[key] = []
if other["oid"] not in self[key]:
# TODO: Returning just the other drop OID instead of the named
# port list is not a good solution. Required for the dask
# tests.
append = {other["oid"]: IdText} if IdText else other["oid"]
# if IdText is None:
append = {other["oid"]: name} if name else other["oid"]
# if name is None:
# raise ValueError
self[key].append(append)

def addConsumer(self, other, IdText=None):
self._addSomething(other, "consumers", IdText=IdText)
def addConsumer(self, other, name=None):
self._addSomething(other, "consumers", name=name)

def addStreamingConsumer(self, other, IdText=None):
self._addSomething(other, "streamingConsumers", IdText=IdText)
def addStreamingConsumer(self, other, name=None):
self._addSomething(other, "streamingConsumers", name=name)

def addInput(self, other, IdText=None):
self._addSomething(other, "inputs", IdText=IdText)
def addInput(self, other, name=None):
self._addSomething(other, "inputs", name=name)

def addStreamingInput(self, other, IdText=None):
self._addSomething(other, "streamingInputs", IdText=IdText)
def addStreamingInput(self, other, name=None):
self._addSomething(other, "streamingInputs", name=name)

def addOutput(self, other, IdText=None):
self._addSomething(other, "outputs", IdText=IdText)
def addOutput(self, other, name=None):
self._addSomething(other, "outputs", name=name)

def addProducer(self, other, IdText=None):
self._addSomething(other, "producers", IdText=IdText)
def addProducer(self, other, name=None):
self._addSomething(other, "producers", name=name)


def _sanitize_links(links):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from enum import Enum

from dlg.common.reproducibility.constants import ReproducibilityFlags
from dlg.common import DropType
import logging

logger = logging.getLogger("__name__")
Expand Down Expand Up @@ -167,11 +166,11 @@ def lg_block_fields(
elif category_type == "Variables":
pass
elif category_type == "Branch":
data[DropType.APPCLASS] = FieldOps.STORE
data["dropclass"] = FieldOps.STORE
elif category_type == "PythonApp":
data[DropType.APPCLASS] = FieldOps.STORE
data["dropclass"] = FieldOps.STORE
elif category_type == "Component":
data[DropType.APPCLASS] = FieldOps.STORE
data["dropclass"] = FieldOps.STORE
elif category_type == "BashShellApp":
data["Arg01"] = FieldOps.STORE
elif category_type == "Mpi":
Expand Down
13 changes: 8 additions & 5 deletions daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class ThreadingWSGIServer(

class LoggingWSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
def log_message(self, fmt, *args):
logger.debug(fmt, *args)
pass
# logger.debug(fmt, *args)


class RestServerWSGIServer:
Expand Down Expand Up @@ -143,7 +144,9 @@ def _get_json(self, url):
def _post_form(self, url, content=None):
if content is not None:
content = urllib.parse.urlencode(content)
ret = self._POST(url, content, content_type="application/x-www-form-urlencoded")
ret = self._POST(
url, content, content_type="application/x-www-form-urlencoded"
)
return json.load(ret) if ret else None

def _post_json(self, url, content, compress=False):
Expand Down Expand Up @@ -177,10 +180,11 @@ def _DELETE(self, url):
return stream

def _request(self, url, method, content=None, headers={}):

# Do the HTTP stuff...
url = self.url_prefix + url
logger.debug("Sending %s request to %s:%d%s", method, self.host, self.port, url)
logger.debug(
"Sending %s request to %s:%d%s", method, self.host, self.port, url
)

if not common.portIsOpen(self.host, self.port, self.timeout):
raise RestClientException(
Expand All @@ -198,7 +202,6 @@ def _request(self, url, method, content=None, headers={}):

# Server errors are encoded in the body as json content
if self._resp.status != http.HTTPStatus.OK:

msg = "Error on remote %s@%s:%s%s (status %d): " % (
method,
self.host,
Expand Down
6 changes: 4 additions & 2 deletions daliuge-engine/dlg/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ def _notifyAppIsFinished(self):
else:
self.status = DROPStates.COMPLETED
logger.debug(
"Moving %r to %s", self, "FINISHED" if not is_error else "ERROR"
"Moving %r to %s",
self.oid,
"FINISHED" if not is_error else "ERROR",
)
self._fire(
"producerFinished", status=self.status, execStatus=self.execStatus
Expand Down Expand Up @@ -424,7 +426,7 @@ def execute(self, _send_notifications=True):
# applications, for the time being they follow their execState.

# Run at most self._n_tries if there are errors during the execution
logger.debug("Executing %r", self)
logger.debug("Executing %r", self.oid)
tries = 0
drop_state = DROPStates.COMPLETED
self.execStatus = AppDROPStates.RUNNING
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def store(self, inputDrop):
# @par EAGLE_START
# @param category PythonApp
# @param tag daliuge
# @param appclass Application Class/dlg.apps.archiving.NgasArchivingApp/String/ComponentParameter/readonly//False/False/Application class
# @param dropclass Application Class/dlg.apps.archiving.NgasArchivingApp/String/ComponentParameter/readonly//False/False/Application class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def execute(self, data):
# @param command_line_arguments Command Line Arguments//String/ComponentParameter/readwrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator Param value separator/ /String/ComponentParameter/readwrite//False/False/Separator character(s) between parameters on the command line
# @param argumentPrefix Argument prefix/"--"/String/ComponentParameter/readwrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dropclass/dlg.apps.bash_shell_app.BashShellApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# @par EAGLE_START
# @param category Branch
# @param tag template
# @param appclass Application Class/dlg.apps.simple.SimpleBranch/String/ComponentParameter/readonly//False/False/Application class
# @param dropclass Application Class/dlg.apps.simple.SimpleBranch/String/ComponentParameter/readonly//False/False/Application class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
9 changes: 9 additions & 0 deletions daliuge-engine/dlg/apps/constructs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# @param category Scatter
# @param tag template
# @param num_of_copies Scatter dimension/4/Integer/ComponentParameter/readwrite//False/False/Specifies the number of replications of the content of the scatter construct
# @param dropclass dropclass/dlg.apps.constructs.ScatterDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class ScatterDrop(BarrierAppDROP):
"""
Expand All @@ -25,6 +26,7 @@ class ScatterDrop(BarrierAppDROP):
# @param tag template
# @param num_of_inputs No. of inputs/2/Integer/ApplicationArgument/readwrite//False/False/Number of inputs
# @param gather_axis Index of gather axis/0/Integer/ApplicationArgument/readwrite//False/False/Index of gather axis
# @param dropclass dropclass/dlg.apps.constructs.GatherDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class GatherDrop(BarrierAppDROP):
"""
Expand All @@ -41,6 +43,7 @@ class GatherDrop(BarrierAppDROP):
# @param category Loop
# @param tag template
# @param num_of_iter No. of iterations/2/Integer/ApplicationArguments/readwrite//False/False/Number of iterations
# @param dropclass dropclass/dlg.apps.constructs.LoopDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class LoopDrop(BarrierAppDROP):
"""
Expand All @@ -57,6 +60,7 @@ class LoopDrop(BarrierAppDROP):
# @param category MKN
# @param tag template
# @param k K/1/Integer/ApplicationArgument/readwrite//False/False/Internal multiplicity
# @param dropclass dropclass/dlg.apps.constructs.MKNDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class MKNDrop(BarrierAppDROP):
"""
Expand All @@ -74,6 +78,7 @@ class MKNDrop(BarrierAppDROP):
# @param tag template
# @param num_of_inputs No. of inputs/2/Integer/ApplicationArgument/readwrite//False/False/Number of inputs
# @param gather_axis Index of gather axis/0/Integer/ApplicationArgument/readwrite//False/False/Index of gather axis
# @param dropclass dropclass/dlg.apps.constructs.GroupByDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @par EAGLE_END
class GroupByDrop(BarrierAppDROP):
"""
Expand All @@ -88,6 +93,7 @@ class GroupByDrop(BarrierAppDROP):
# @details A SubGraph template drop
# @par EAGLE_START
# @param category SubGraph
# @param dropclass dropclass/dlg.apps.constructs.SubGraphDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class SubGraphDrop(BarrierAppDROP):
Expand All @@ -103,6 +109,7 @@ class SubGraphDrop(BarrierAppDROP):
# @details A comment template drop
# @par EAGLE_START
# @param category Comment
# @param dropclass dropclass/dlg.apps.constructs.CommentDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class CommentDrop(BarrierAppDROP):
Expand All @@ -118,6 +125,7 @@ class CommentDrop(BarrierAppDROP):
# @details A description template drop
# @par EAGLE_START
# @param category Description
# @param dropclass dropclass/dlg.apps.constructs.DescriptionDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class DescriptionDrop(BarrierAppDROP):
Expand All @@ -133,6 +141,7 @@ class DescriptionDrop(BarrierAppDROP):
# @details An Exclusive Force Node
# @par EAGLE_START
# @param category ExclusiveForceNode
# @param dropclass dropclass/dlg.apps.constructs.ExclusiveForceDrop/String/ComponentParameter/readwrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
class ExclusiveForceDrop(BarrierAppDROP):
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def run(self):
# @par EAGLE_START
# @param category PythonApp
# @param tag daliuge
# @param appclass Application Class/dlg.apps.crc.CRCStreamApp/String/ComponentParameter/readonly//False/False/Application class
# @param dropclass Application Class/dlg.apps.crc.CRCStreamApp/String/ComponentParameter/readonly//False/False/Application class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def waitForIp(self, timeout=None):
# @param command_line_arguments Command Line Arguments//String/ComponentParameter/readwrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator Param value separator/ /String/ComponentParameter/readwrite//False/False/Separator character(s) between parameters and their respective values on the command line
# @param argumentPrefix Argument prefix/"--"/String/ComponentParameter/readwrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dropclass/dlg.apps.dockerapp.DockerApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
3 changes: 1 addition & 2 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ def initialize(self, **kwargs):
self._c_outputs_setting_lock = threading.Lock()

def _ensure_c_outputs_are_set(self):

with self._c_outputs_setting_lock:
if self._c_outputs_set:
return
Expand Down Expand Up @@ -383,6 +382,7 @@ def generate_recompute_data(self):
# @param category DynlibApp
# @param tag template
# @param libpath Library Path//String/ComponentParameter/readwrite//False/False/The location of the shared object/DLL that implements this application
# @param dropclass dropclass/dlg.apps.dynlib.DynlibApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
Expand Down Expand Up @@ -497,7 +497,6 @@ def initialize(self, **kwargs):
self.proc = None

def run(self):

if not hasattr(self, "_rpc_server"):
raise Exception("DynlibProcApp can only run within an RPC server")

Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
# @param command_line_arguments Command Line Arguments//String/ComponentParameter/readwrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator Param value separator/ /String/ComponentParameter/readwrite//False/False/Separator character(s) between parameters on the command line
# @param argumentPrefix Argument prefix/"--"/String/ComponentParameter/readwrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dropclass/dlg.apps.mpi.MPIApp/String/ComponentParameter/readwrite//False/False/Drop class
# @param input_parser Input Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser Output Parser/pickle/Select/ApplicationArgument/readwrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down
Loading

0 comments on commit c79f28c

Please sign in to comment.