Skip to content

Commit

Permalink
Merge branch 'master' into liu-282
Browse files Browse the repository at this point in the history
# Conflicts:
#	daliuge-engine/dlg/apps/pyfunc.py
#	daliuge-engine/dlg/drop.py
  • Loading branch information
pritchardn committed Aug 18, 2022
2 parents 5f9ab05 + fea07cc commit d824bd5
Show file tree
Hide file tree
Showing 46 changed files with 1,178 additions and 728 deletions.
14 changes: 7 additions & 7 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ def initialize(self, **kwargs):
super(BashShellBase, self).initialize(**kwargs)

self.proc = None
self._inputRedirect = self._getArg(kwargs, "input_redirection", "")
self._outputRedirect = self._getArg(kwargs, "output_redirection", "")
self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ")
self._inputRedirect = self._popArg(kwargs, "input_redirection", "")
self._outputRedirect = self._popArg(kwargs, "output_redirection", "")
self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")

if not self.command:
self.command = self._getArg(kwargs, "command", None)
self.command = self._popArg(kwargs, "command", None)
if not self.command:
raise InvalidDropException(
self, "No command specified, cannot create BashShellApp"
Expand Down
36 changes: 36 additions & 0 deletions daliuge-engine/dlg/apps/branch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dlg.drop import BarrierAppDROP, track_current_drop
from dlg.exceptions import InvalidDropException

##
# @brief Branch
# @details A conditional branch to control flow
# @par EAGLE_START
# @param category Branch
# @param tag template
# @param appclass Application Class/dlg.apps.simple.SimpleBranch/String/ComponentParameter/readonly//False/False/Application class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
# @param input_error_threshold "Input error rate (%)"/0/Integer/ComponentParameter/readwrite//False/False/the allowed failure rate of the inputs (in percent), before this component goes to ERROR state and is not executed
# @param n_tries Number of tries/1/Integer/ComponentParameter/readwrite//False/False/Specifies the number of times the 'run' method will be executed before finally giving up
# @param dummy0 dummy0//Object/OutputPort/readwrite//False/False/Dummy output port
# @param dummy1 dummy1//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class BranchAppDrop(BarrierAppDROP):
"""
A special kind of application with exactly two outputs. After normal
execution, the application decides whether a certain condition is met.
If the condition is met, the first output is considered as COMPLETED,
while the other is moved to SKIPPED state, and vice-versa.
"""

@track_current_drop
def execute(self, _send_notifications=True):
if len(self._outputs) != 2:
raise InvalidDropException(
self,
f"BranchAppDrops should have exactly 2 outputs, not {len(self._outputs)}",
)
BarrierAppDROP.execute(self, _send_notifications=False)
self.outputs[1 if self.condition() else 0].skip()
self._notifyAppIsFinished()
24 changes: 12 additions & 12 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ def initialize(self, **kwargs):
self._containerLock = multiprocessing.Lock()
super().initialize(**kwargs)

self._image = self._getArg(kwargs, "image", None)
self._env = self._getArg(kwargs, "env", None)
self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ")
self._image = self._popArg(kwargs, "image", None)
self._env = self._popArg(kwargs, "env", None)
self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")
if not self._image:
raise InvalidDropException(
self, "No docker image specified, cannot create DockerApp"
Expand All @@ -254,7 +254,7 @@ def initialize(self, **kwargs):
self._image,
)

self._command = self._getArg(kwargs, "command", None)
self._command = self._popArg(kwargs, "command", None)

self._noBash = False
if not self._command or self._command.strip()[:2] == "%%":
Expand Down Expand Up @@ -283,14 +283,14 @@ def initialize(self, **kwargs):
# might want to preserve them.
# TODO: This might be something that the data lifecycle manager could
# handle, but for the time being we do it here
self._removeContainer = self._getArg(kwargs, "removeContainer", True)
self._removeContainer = self._popArg(kwargs, "removeContainer", True)

# Ports - a comma seperated list of the host port mappings of form:
# "hostport1:containerport1, hostport2:containerport2"
self._portMappings = self._getArg(kwargs, "portMappings", "")
self._portMappings = self._popArg(kwargs, "portMappings", "")
logger.info(f"portMappings: {self._portMappings}")

self._shmSize = self._getArg(kwargs, "shmSize", "")
self._shmSize = self._popArg(kwargs, "shmSize", "")
logger.info(f"shmSize: {self._shmSize}")

# Additional volume bindings can be specified for existing files/dirs
Expand All @@ -302,7 +302,7 @@ def initialize(self, **kwargs):
f"{utils.getDlgDir()}/workspace/settings/passwd:/etc/passwd",
f"{utils.getDlgDir()}/workspace/settings/group:/etc/group",
]
additionalBindings = self._getArg(kwargs, "additionalBindings", [])
additionalBindings = self._popArg(kwargs, "additionalBindings", [])
additionalBindings = (
additionalBindings.split(",")
if isinstance(additionalBindings, str)
Expand Down Expand Up @@ -352,7 +352,7 @@ def initialize(self, **kwargs):
self._sessionId = self._dlg_session.sessionId if self._dlg_session else ""
if not self.workdir:
default_workingdir = os.path.join(utils.getDlgWorkDir(), self._sessionId)
self.workdir = self._getArg(kwargs, "workingDir", default_workingdir)
self.workdir = self._popArg(kwargs, "workingDir", default_workingdir)

c.api.close()

Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class DynlibApp(DynlibAppBase, BarrierAppDROP):

def initialize(self, **kwargs):
super(DynlibApp, self).initialize(**kwargs)
self.ranks = self._getArg(kwargs, "rank", None)
self.ranks = self._popArg(kwargs, "rank", None)

def run(self):
input_closers = prepare_c_inputs(self._c_app, self.inputs)
Expand Down Expand Up @@ -475,7 +475,7 @@ def initialize(self, **kwargs):
if "lib" not in kwargs:
raise InvalidDropException(self, "library not specified")
self.libname = kwargs.pop("lib")
self.timeout = self._getArg(kwargs, "timeout", 600) # 10 minutes
self.timeout = self._popArg(kwargs, "timeout", 600) # 10 minutes
self.app_params = kwargs
self.proc = None

Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ class MPIApp(BarrierAppDROP):
def initialize(self, **kwargs):
super(MPIApp, self).initialize(**kwargs)

self._command = self._getArg(kwargs, "command", None)
self._maxprocs = self._getArg(kwargs, "maxprocs", 1)
self._use_wrapper = self._getArg(kwargs, "use_wrapper", False)
self._args = self._getArg(kwargs, "args", [])
self._command = self._popArg(kwargs, "command", None)
self._maxprocs = self._popArg(kwargs, "maxprocs", 1)
self._use_wrapper = self._popArg(kwargs, "use_wrapper", False)
self._args = self._popArg(kwargs, "args", [])
if not self._command:
raise InvalidDropException(
self, "No command specified, cannot create MPIApp"
Expand Down
10 changes: 5 additions & 5 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class DropParser(Enum):
# calling the function is treated as an iterable, with each individual object
# being written to its corresponding output.
# @par EAGLE_START
# @param category PythonApp
# @param category PyFuncApp
# @param tag template
# @param appclass Application Class/dlg.apps.pyfunc.PyFuncApp/String/ComponentParameter/readonly//False/False/Application class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down Expand Up @@ -278,9 +278,9 @@ def initialize(self, **kwargs):
if self._dlg_session:
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})

self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})

self.func_code = self._getArg(kwargs, "func_code", None)
self.func_code = self._popArg(kwargs, "func_code", None)

# check for function definition arguments in applicationArgs
self.func_def_keywords = [
Expand Down Expand Up @@ -454,7 +454,7 @@ def optionalEval(x):
_dum = [appArgs.pop(k) for k in self.func_def_keywords if k in appArgs]
logger.debug("Identified keyword arguments removed: %s",
[i['text'] for i in _dum])
pargsDict.update({k:self.parameters[k] for k in pargsDict if k in
pargsDict.update({k:self.parameters[k] for k in pargsDict if k in
self.parameters})
# if defined in both we use AppArgs values
pargsDict.update({k:appArgs[k]['value'] for k in pargsDict if k
Expand All @@ -463,7 +463,7 @@ def optionalEval(x):
else:
appArgs = {}

if ('inputs' in self.parameters and
if ('inputs' in self.parameters and
droputils.check_ports_dict(self.parameters['inputs'])):
check_len = min(len(inputs),self.fn_nargs+
len(self.arguments.kwonlyargs))
Expand Down
7 changes: 3 additions & 4 deletions daliuge-engine/dlg/apps/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
from dlg.remote import copyTo, copyFrom
from dlg.drop import (
BarrierAppDROP,
NgasDROP,
InMemoryDROP,
SharedMemoryDROP,
NullDROP,
RDBMSDrop,
ContainerDROP,
)
from dlg.data.rdbms import RDBMSDrop
from dlg.data.memory import InMemoryDROP, SharedMemoryDROP
from dlg.data.ngas import NgasDROP
from dlg.meta import (
dlg_string_param,
dlg_float_param,
Expand Down
69 changes: 65 additions & 4 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import numpy as np

from dlg import droputils, utils
from dlg.drop import BarrierAppDROP, BranchAppDrop, ContainerDROP
from dlg.drop import BarrierAppDROP, ContainerDROP
from dlg.apps.branch import BranchAppDrop
from dlg.meta import (
dlg_float_param,
dlg_string_param,
Expand Down Expand Up @@ -89,7 +90,7 @@ class PythonApp(BarrierAppDROP):
# without executing real algorithms. Very useful for debugging.
# @par EAGLE_START
# @param category PythonApp
# @param tag template
# @param tag daliuge
# @param sleepTime Sleep Time/5/Integer/ApplicationArgument/readwrite//False/False/The number of seconds to sleep
# @param appclass Application Class/dlg.apps.simple.SleepApp/String/ComponentParameter/readonly//False/False/Application class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
Expand Down Expand Up @@ -701,7 +702,7 @@ class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

def initialize(self, **kwargs):
self.result = self._getArg(kwargs, "result", True)
self.result = self._popArg(kwargs, "result", True)
BranchAppDrop.initialize(self, **kwargs)

def run(self):
Expand All @@ -711,6 +712,66 @@ def condition(self):
return self.result


##
# @brief PickOne
# @details App that picks the first element of an input list, passes that
# to all outputs, except the first one. The first output is used to pass
# the remaining array on. This app is useful for a loop.
#
# @par EAGLE_START
# @param category PythonApp
# @param appclass Application Class/dlg_example_cmpts.apps.PickOne/String/ComponentParameter/readonly//False/False/Import path for application class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time # noqa: E501
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used # noqa: E501
# @param rest_array rest_array//Object.array/InputPort/readwrite//False/FalseList of elements
# @param element element//Object.element/OutputPort/readwrite//False/False/Port carrying the first element of input array
# @param rest_array rest_array//Object.array/OutputPort/readwrite//False/False/Port carrying the rest array
# @par EAGLE_END
class PickOne(BarrierAppDROP):
"""
Simple app picking one element at a time. Good for Loops.
"""

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, **kwargs)

def readData(self):
input = self.inputs[0]
data = pickle.loads(droputils.allDropContents(input))

# make sure we always have a ndarray with at least 1dim.
if type(data) not in (list, tuple) and not isinstance(
data, (np.ndarray)
):
raise TypeError
if isinstance(data, np.ndarray) and data.ndim == 0:
data = np.array([data])
else:
data = np.array(data)
value = data[0] if len(data) else None
rest = data[1:] if len(data) > 1 else np.array([])
return value, rest

def writeData(self, value, rest):
"""
Prepare the data and write to all outputs
"""
# write rest to array output
# and value to every other output
for output in self.outputs:
if output.name == "rest_array":
d = pickle.dumps(rest)
output.len = len(d)
else:
d = pickle.dumps(value)
output.len = len(d)
output.write(d)

def run(self):
value, rest = self.readData()
self.writeData(value, rest)


##
# @brief ListAppendThrashingApp
# @details A testing APP that appends a random integer to a list num times.
Expand Down Expand Up @@ -749,7 +810,7 @@ class ListAppendThrashingApp(BarrierAppDROP):
)

def initialize(self, **kwargs):
self.size = self._getArg(kwargs, "size", 100)
self.size = self._popArg(kwargs, "size", 100)
self.marray = []
super(ListAppendThrashingApp, self).initialize(**kwargs)

Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/dask_emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class ResultTransmitter(BarrierAppDROP):

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, input_error_threshold=100, **kwargs)
self.host = self._getArg(kwargs, "host", "127.0.0.1")
self.port = self._getArg(kwargs, "port", None)
self.host = self._popArg(kwargs, "host", "127.0.0.1")
self.port = self._popArg(kwargs, "port", None)
if self.port is None:
raise InvalidDropException(self, "Missing port parameter")

Expand Down
44 changes: 44 additions & 0 deletions daliuge-engine/dlg/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
"""
This package contains several general-purpose data stores in form of
DROPs that we have developed as examples and for real-life use. Most of them
are based on the :class:`DataDROP`.
"""

__all__ = [
"DirectoryContainer",
"FileDROP",
"InMemoryDROP",
"SharedMemoryDROP",
"NgasDROP",
"RDBMSDrop",
"PlasmaDROP",
"PlasmaFlightDROP",
]

from dlg.data.directorycontainer import DirectoryContainer
from dlg.data.file import FileDROP
from dlg.data.memory import InMemoryDROP, SharedMemoryDROP
from dlg.data.ngas import NgasDROP
from dlg.data.plasma import PlasmaDROP, PlasmaFlightDROP
from dlg.data.rdbms import RDBMSDrop
Loading

0 comments on commit d824bd5

Please sign in to comment.