Skip to content

Commit

Permalink
Merge pull request #196 from ICRAR/LIU-93
Browse files Browse the repository at this point in the history
Liu 93 - Moving data drops to separate module
  • Loading branch information
awicenec committed Aug 16, 2022
2 parents 1127ae4 + e5575b4 commit cc1bfc9
Show file tree
Hide file tree
Showing 36 changed files with 1,097 additions and 786 deletions.
36 changes: 36 additions & 0 deletions daliuge-engine/dlg/apps/branch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dlg.drop import BarrierAppDROP, track_current_drop
from dlg.exceptions import InvalidDropException

##
# @brief Branch
# @details A conditional branch to control flow
# @par EAGLE_START
# @param category Branch
# @param tag template
# @param appclass Application Class/dlg.apps.simple.SimpleBranch/String/ComponentParameter/readonly//False/False/Application class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
# @param input_error_threshold "Input error rate (%)"/0/Integer/ComponentParameter/readwrite//False/False/the allowed failure rate of the inputs (in percent), before this component goes to ERROR state and is not executed
# @param n_tries Number of tries/1/Integer/ComponentParameter/readwrite//False/False/Specifies the number of times the 'run' method will be executed before finally giving up
# @param dummy0 dummy0//Object/OutputPort/readwrite//False/False/Dummy output port
# @param dummy1 dummy1//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class BranchAppDrop(BarrierAppDROP):
"""
A special kind of application with exactly two outputs. After normal
execution, the application decides whether a certain condition is met.
If the condition is met, the first output is considered as COMPLETED,
while the other is moved to SKIPPED state, and vice-versa.
"""

@track_current_drop
def execute(self, _send_notifications=True):
if len(self._outputs) != 2:
raise InvalidDropException(
self,
f"BranchAppDrops should have exactly 2 outputs, not {len(self._outputs)}",
)
BarrierAppDROP.execute(self, _send_notifications=False)
self.outputs[1 if self.condition() else 0].skip()
self._notifyAppIsFinished()
7 changes: 3 additions & 4 deletions daliuge-engine/dlg/apps/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
from dlg.remote import copyTo, copyFrom
from dlg.drop import (
BarrierAppDROP,
NgasDROP,
InMemoryDROP,
SharedMemoryDROP,
NullDROP,
RDBMSDrop,
ContainerDROP,
)
from dlg.data.rdbms import RDBMSDrop
from dlg.data.memory import InMemoryDROP, SharedMemoryDROP
from dlg.data.ngas import NgasDROP
from dlg.meta import (
dlg_string_param,
dlg_float_param,
Expand Down
63 changes: 62 additions & 1 deletion daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import numpy as np

from dlg import droputils, utils
from dlg.drop import BarrierAppDROP, BranchAppDrop, ContainerDROP
from dlg.drop import BarrierAppDROP, ContainerDROP
from dlg.apps.branch import BranchAppDrop
from dlg.meta import (
dlg_float_param,
dlg_string_param,
Expand Down Expand Up @@ -711,6 +712,66 @@ def condition(self):
return self.result


##
# @brief PickOne
# @details App that picks the first element of an input list, passes that
# to all outputs, except the first one. The first output is used to pass
# the remaining array on. This app is useful for a loop.
#
# @par EAGLE_START
# @param category PythonApp
# @param appclass Application Class/dlg_example_cmpts.apps.PickOne/String/ComponentParameter/readonly//False/False/Import path for application class
# @param execution_time Execution Time/5/Float/ComponentParameter/readonly//False/False/Estimated execution time # noqa: E501
# @param num_cpus No. of CPUs/1/Integer/ComponentParameter/readonly//False/False/Number of cores used # noqa: E501
# @param rest_array rest_array//Object.array/InputPort/readwrite//False/FalseList of elements
# @param element element//Object.element/OutputPort/readwrite//False/False/Port carrying the first element of input array
# @param rest_array rest_array//Object.array/OutputPort/readwrite//False/False/Port carrying the rest array
# @par EAGLE_END
class PickOne(BarrierAppDROP):
"""
Simple app picking one element at a time. Good for Loops.
"""

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, **kwargs)

def readData(self):
input = self.inputs[0]
data = pickle.loads(droputils.allDropContents(input))

# make sure we always have a ndarray with at least 1dim.
if type(data) not in (list, tuple) and not isinstance(
data, (np.ndarray)
):
raise TypeError
if isinstance(data, np.ndarray) and data.ndim == 0:
data = np.array([data])
else:
data = np.array(data)
value = data[0] if len(data) else None
rest = data[1:] if len(data) > 1 else np.array([])
return value, rest

def writeData(self, value, rest):
"""
Prepare the data and write to all outputs
"""
# write rest to array output
# and value to every other output
for output in self.outputs:
if output.name == "rest_array":
d = pickle.dumps(rest)
output.len = len(d)
else:
d = pickle.dumps(value)
output.len = len(d)
output.write(d)

def run(self):
value, rest = self.readData()
self.writeData(value, rest)


##
# @brief ListAppendThrashingApp
# @details A testing APP that appends a random integer to a list num times.
Expand Down
44 changes: 44 additions & 0 deletions daliuge-engine/dlg/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
"""
This package contains several general-purpose data stores in form of
DROPs that we have developed as examples and for real-life use. Most of them
are based on the :class:`DataDROP`.
"""

__all__ = [
"DirectoryContainer",
"FileDROP",
"InMemoryDROP",
"SharedMemoryDROP",
"NgasDROP",
"RDBMSDrop",
"PlasmaDROP",
"PlasmaFlightDROP",
]

from dlg.data.directorycontainer import DirectoryContainer
from dlg.data.file import FileDROP
from dlg.data.memory import InMemoryDROP, SharedMemoryDROP
from dlg.data.ngas import NgasDROP
from dlg.data.plasma import PlasmaDROP, PlasmaFlightDROP
from dlg.data.rdbms import RDBMSDrop
74 changes: 74 additions & 0 deletions daliuge-engine/dlg/data/directorycontainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import os
import shutil

from dlg.data.file import FileDROP
from dlg.ddap_protocol import DROPRel, DROPLinkType
from dlg.drop import PathBasedDrop, ContainerDROP
from dlg.exceptions import InvalidDropException, InvalidRelationshipException
from dlg.meta import dlg_bool_param


class DirectoryContainer(PathBasedDrop, ContainerDROP):
"""
A ContainerDROP that represents a filesystem directory. It only allows
FileDROPs and DirectoryContainers to be added as children. Children
can only be added if they are placed directly within the directory
represented by this DirectoryContainer.
"""

check_exists = dlg_bool_param("check_exists", True)

def initialize(self, **kwargs):
ContainerDROP.initialize(self, **kwargs)

if "dirname" not in kwargs:
raise InvalidDropException(
self, 'DirectoryContainer needs a "dirname" parameter'
)

directory = kwargs["dirname"]

if self.check_exists is True:
if not os.path.isdir(directory):
raise InvalidDropException(self, "%s is not a directory" % (directory))

self._path = self.get_dir(directory)

def addChild(self, child):
if isinstance(child, (FileDROP, DirectoryContainer)):
path = child.path
if os.path.dirname(path) != self.path:
raise InvalidRelationshipException(
DROPRel(child, DROPLinkType.CHILD, self),
"Child DROP is not under %s" % (self.path),
)
ContainerDROP.addChild(self, child)
else:
raise TypeError("Child DROP is not of type FileDROP or DirectoryContainer")

def delete(self):
shutil.rmtree(self._path)

def exists(self):
return os.path.isdir(self._path)

0 comments on commit cc1bfc9

Please sign in to comment.