Skip to content

Commit

Permalink
doc update
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Feb 2, 2022
1 parent 9edb514 commit a3f796b
Showing 1 changed file with 86 additions and 79 deletions.
165 changes: 86 additions & 79 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,13 +891,19 @@ def path(self):


class DataDROP(AbstractDROP):
"""The base of all data drops.
Args:
AbstractDROP ([type]): [description]
Returns:
[type]: [description]
"""
A DataDROP is a DROP that stores data for writing with
an AppDROP, or reading with one or more AppDROPs.
DataDROPs have two different modes: "normal" and "streaming".
Normal DataDROPs will wait until the COMPLETED state before being
available as input to an AppDROP, while streaming AppDROPs may be
read simutaneously with writing by chunking drop bytes together.
This class contains two methods that need to be overrwritten:
`getIO`, invoked by AppDROPs when reading or writing to a drop,
and `dataURL`, a getter for a data URI uncluding protocol and address
parsed by function `IOForURL`.
"""

def incrRefCount(self):
Expand Down Expand Up @@ -986,7 +992,6 @@ def read(self, descriptor, count=4096, **kwargs):
io = self._rios[descriptor]
return io.read(count, **kwargs)


def _checkStateAndDescriptor(self, descriptor):
if self.status != DROPStates.COMPLETED:
raise Exception(
Expand Down Expand Up @@ -1176,7 +1181,7 @@ def exists(self):
return self.getIO().exists()

@abstractmethod
def dataURL(self):
def dataURL(self) -> str:
"""
A URL that points to the data referenced by this DROP. Different
DROP implementations will use different URI schemes.
Expand Down Expand Up @@ -1876,6 +1881,66 @@ def dataURL(self):
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


##
# @brief ParameterSet
# @details A set of parameters, wholly specified in EAGLE
# @par EAGLE_START
# @param category ParameterSet
# @param[in] param/mode Parset mode/"YANDA"/String/readonly/False/To what standard DALiuGE should filter and serialize the parameters.
# @param[in] param/config_data ConfigData/""/String/readwrite/False/Additional configuration information to be mixed in with the initial data
# @param[out] port/Config ConfigFile/File/The output configuration file
# @par EAGLE_END
class ParameterSetDROP(DataDROP):
"""
A generic configuration file template wrapper
This drop opens an (optional) file containing some initial configuration information, then
appends any additional specified parameters to it, finally serving it as a data object.
"""

config_data = b''

mode = dlg_string_param('mode', None)

@abstractmethod
def serialize_parameters(self, parameters: dict, mode):
"""
Returns a string representing a serialization of the parameters.
"""
if mode == "YANDA":
# TODO: Add more complex value checking
return "\n".join(f"{x}={y}" for x, y in parameters.items())
# Add more formats (.ini for example)
return "\n".join(f"{x}={y}" for x, y in parameters.items())

@abstractmethod
def filter_parameters(self, parameters: dict, mode):
"""
Returns a dictionary of parameters, with daliuge-internal or other parameters filtered out
"""
if mode == 'YANDA':
forbidden_params = list(DEFAULT_INTERNAL_PARAMETERS)
if parameters['config_data'] == "":
forbidden_params.append('configData')
return {key: val for key, val in parameters.items() if
key not in DEFAULT_INTERNAL_PARAMETERS}
return parameters

def initialize(self, **kwargs):
"""
TODO: Open input file
"""
self.config_data = self.serialize_parameters(
self.filter_parameters(self.parameters, self.mode), self.mode).encode('utf-8')

def getIO(self):
return MemoryIO(io.BytesIO(self.config_data))

@property
def dataURL(self):
hostname = os.uname()[1]
return f"config://{hostname}/{os.getpid()}/{id(self.config_data)}"


# ===============================================================================
# AppDROP classes follow
# ===============================================================================
Expand All @@ -1884,24 +1949,24 @@ def dataURL(self):
class AppDROP(ContainerDROP):
"""
An AppDROP is a DROP representing an application that reads data
from one or more DROPs (its inputs), and writes data onto one or more
DROPs (its outputs).
from one or more DataDROPs (its inputs), and writes data onto one or more
DataDROPs (its outputs).
AppDROPs accept two different kind of inputs: "normal" and "streaming"
inputs. Normal inputs are DROPs that must be on the COMPLETED state
inputs. Normal inputs are DataDROPs that must be on the COMPLETED state
(and therefore their data must be fully written) before this application is
run, while streaming inputs are DROPs that feed chunks of data into
run, while streaming inputs are DataDROPs that feed chunks of data into
this application as the data gets written into them.
This class contains two methods that should be overwritten as needed by
subclasses: `dropCompleted`, invoked when input DROPs move to
This class contains two methods that need to be overwritten by
subclasses: `dropCompleted`, invoked when input DataDROPs move to
COMPLETED, and `dataWritten`, invoked with the data coming from streaming
inputs.
How and when applications are executed is completely up to the user, and is
not enforced by this base class. Some applications might need to be run at
`initialize` time, while other might start during the first invocation of
`dataWritten`. A common scenario anyway is to start an application only
How and when applications are executed is completely up to the app component
developer, and is not enforced by this base class. Some applications might need
to be run at `initialize` time, while other might start during the first invocation
of `dataWritten`. A common scenario anyway is to start an application only
after all its inputs have moved to COMPLETED (implying that none of them is
an streaming input); for these cases see the `BarrierAppDROP`.
"""
Expand Down Expand Up @@ -1991,13 +2056,15 @@ def handleEvent(self, e):
if e.type == "dropCompleted":
self.dropCompleted(e.uid, e.status)

@abstractmethod
def dropCompleted(self, uid, drop_state):
"""
Callback invoked when the DROP with UID `uid` (which is either a
normal or a streaming input of this AppDROP) has moved to the
COMPLETED or ERROR state. By default no action is performed.
"""

@abstractmethod
def dataWritten(self, uid, data):
"""
Callback invoked when `data` has been written into the DROP with
Expand Down Expand Up @@ -2284,66 +2351,6 @@ def execute(self, _send_notifications=True):
self._notifyAppIsFinished()


##
# @brief ParameterSet
# @details A set of parameters, wholly specified in EAGLE
# @par EAGLE_START
# @param category ParameterSet
# @param[in] param/mode Parset mode/"YANDA"/String/readonly/False/To what standard DALiuGE should filter and serialize the parameters.
# @param[in] param/config_data ConfigData/""/String/readwrite/False/Additional configuration information to be mixed in with the initial data
# @param[out] port/Config ConfigFile/File/The output configuration file
# @par EAGLE_END
class ParameterSetDROP(DataDROP):
"""
A generic configuration file template wrapper
This drop opens an (optional) file containing some initial configuration information, then
appends any additional specified parameters to it, finally serving it as a data object.
"""

config_data = b''

mode = dlg_string_param('mode', None)

@abstractmethod
def serialize_parameters(self, parameters: dict, mode):
"""
Returns a string representing a serialization of the parameters.
"""
if mode == "YANDA":
# TODO: Add more complex value checking
return "\n".join(f"{x}={y}" for x, y in parameters.items())
# Add more formats (.ini for example)
return "\n".join(f"{x}={y}" for x, y in parameters.items())

@abstractmethod
def filter_parameters(self, parameters: dict, mode):
"""
Returns a dictionary of parameters, with daliuge-internal or other parameters filtered out
"""
if mode == 'YANDA':
forbidden_params = list(DEFAULT_INTERNAL_PARAMETERS)
if parameters['config_data'] == "":
forbidden_params.append('configData')
return {key: val for key, val in parameters.items() if
key not in DEFAULT_INTERNAL_PARAMETERS}
return parameters

def initialize(self, **kwargs):
"""
TODO: Open input file
"""
self.config_data = self.serialize_parameters(
self.filter_parameters(self.parameters, self.mode), self.mode).encode('utf-8')

def getIO(self):
return MemoryIO(io.BytesIO(self.config_data))

@property
def dataURL(self):
hostname = os.uname()[1]
return f"config://{hostname}/{os.getpid()}/{id(self.config_data)}"


# Dictionary mapping 1-to-many DROPLinkType constants to the corresponding methods
# used to append a a DROP into a relationship collection of another
# (e.g., one uses `addConsumer` to add a DROPLinkeType.CONSUMER DROP into
Expand Down

0 comments on commit a3f796b

Please sign in to comment.