From a3f796b08f1d71e5c3d9e546377713c4725d3c8a Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Wed, 2 Feb 2022 16:29:07 +0800 Subject: [PATCH] doc update --- daliuge-engine/dlg/drop.py | 165 +++++++++++++++++++------------------ 1 file changed, 86 insertions(+), 79 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index e7cc109fa..b804a6b4f 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -891,13 +891,19 @@ def path(self): class DataDROP(AbstractDROP): - """The base of all data drops. - - Args: - AbstractDROP ([type]): [description] - - Returns: - [type]: [description] + """ + A DataDROP is a DROP that stores data for writing with + an AppDROP, or reading with one or more AppDROPs. + + DataDROPs have two different modes: "normal" and "streaming". + Normal DataDROPs will wait until the COMPLETED state before being + available as input to an AppDROP, while streaming AppDROPs may be + read simutaneously with writing by chunking drop bytes together. + + This class contains two methods that need to be overrwritten: + `getIO`, invoked by AppDROPs when reading or writing to a drop, + and `dataURL`, a getter for a data URI uncluding protocol and address + parsed by function `IOForURL`. """ def incrRefCount(self): @@ -986,7 +992,6 @@ def read(self, descriptor, count=4096, **kwargs): io = self._rios[descriptor] return io.read(count, **kwargs) - def _checkStateAndDescriptor(self, descriptor): if self.status != DROPStates.COMPLETED: raise Exception( @@ -1176,7 +1181,7 @@ def exists(self): return self.getIO().exists() @abstractmethod - def dataURL(self): + def dataURL(self) -> str: """ A URL that points to the data referenced by this DROP. Different DROP implementations will use different URI schemes. @@ -1876,6 +1881,66 @@ def dataURL(self): return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii")) +## +# @brief ParameterSet +# @details A set of parameters, wholly specified in EAGLE +# @par EAGLE_START +# @param category ParameterSet +# @param[in] param/mode Parset mode/"YANDA"/String/readonly/False/To what standard DALiuGE should filter and serialize the parameters. +# @param[in] param/config_data ConfigData/""/String/readwrite/False/Additional configuration information to be mixed in with the initial data +# @param[out] port/Config ConfigFile/File/The output configuration file +# @par EAGLE_END +class ParameterSetDROP(DataDROP): + """ + A generic configuration file template wrapper + This drop opens an (optional) file containing some initial configuration information, then + appends any additional specified parameters to it, finally serving it as a data object. + """ + + config_data = b'' + + mode = dlg_string_param('mode', None) + + @abstractmethod + def serialize_parameters(self, parameters: dict, mode): + """ + Returns a string representing a serialization of the parameters. + """ + if mode == "YANDA": + # TODO: Add more complex value checking + return "\n".join(f"{x}={y}" for x, y in parameters.items()) + # Add more formats (.ini for example) + return "\n".join(f"{x}={y}" for x, y in parameters.items()) + + @abstractmethod + def filter_parameters(self, parameters: dict, mode): + """ + Returns a dictionary of parameters, with daliuge-internal or other parameters filtered out + """ + if mode == 'YANDA': + forbidden_params = list(DEFAULT_INTERNAL_PARAMETERS) + if parameters['config_data'] == "": + forbidden_params.append('configData') + return {key: val for key, val in parameters.items() if + key not in DEFAULT_INTERNAL_PARAMETERS} + return parameters + + def initialize(self, **kwargs): + """ + TODO: Open input file + """ + self.config_data = self.serialize_parameters( + self.filter_parameters(self.parameters, self.mode), self.mode).encode('utf-8') + + def getIO(self): + return MemoryIO(io.BytesIO(self.config_data)) + + @property + def dataURL(self): + hostname = os.uname()[1] + return f"config://{hostname}/{os.getpid()}/{id(self.config_data)}" + + # =============================================================================== # AppDROP classes follow # =============================================================================== @@ -1884,24 +1949,24 @@ def dataURL(self): class AppDROP(ContainerDROP): """ An AppDROP is a DROP representing an application that reads data - from one or more DROPs (its inputs), and writes data onto one or more - DROPs (its outputs). + from one or more DataDROPs (its inputs), and writes data onto one or more + DataDROPs (its outputs). AppDROPs accept two different kind of inputs: "normal" and "streaming" - inputs. Normal inputs are DROPs that must be on the COMPLETED state + inputs. Normal inputs are DataDROPs that must be on the COMPLETED state (and therefore their data must be fully written) before this application is - run, while streaming inputs are DROPs that feed chunks of data into + run, while streaming inputs are DataDROPs that feed chunks of data into this application as the data gets written into them. - This class contains two methods that should be overwritten as needed by - subclasses: `dropCompleted`, invoked when input DROPs move to + This class contains two methods that need to be overwritten by + subclasses: `dropCompleted`, invoked when input DataDROPs move to COMPLETED, and `dataWritten`, invoked with the data coming from streaming inputs. - How and when applications are executed is completely up to the user, and is - not enforced by this base class. Some applications might need to be run at - `initialize` time, while other might start during the first invocation of - `dataWritten`. A common scenario anyway is to start an application only + How and when applications are executed is completely up to the app component + developer, and is not enforced by this base class. Some applications might need + to be run at `initialize` time, while other might start during the first invocation + of `dataWritten`. A common scenario anyway is to start an application only after all its inputs have moved to COMPLETED (implying that none of them is an streaming input); for these cases see the `BarrierAppDROP`. """ @@ -1991,6 +2056,7 @@ def handleEvent(self, e): if e.type == "dropCompleted": self.dropCompleted(e.uid, e.status) + @abstractmethod def dropCompleted(self, uid, drop_state): """ Callback invoked when the DROP with UID `uid` (which is either a @@ -1998,6 +2064,7 @@ def dropCompleted(self, uid, drop_state): COMPLETED or ERROR state. By default no action is performed. """ + @abstractmethod def dataWritten(self, uid, data): """ Callback invoked when `data` has been written into the DROP with @@ -2284,66 +2351,6 @@ def execute(self, _send_notifications=True): self._notifyAppIsFinished() -## -# @brief ParameterSet -# @details A set of parameters, wholly specified in EAGLE -# @par EAGLE_START -# @param category ParameterSet -# @param[in] param/mode Parset mode/"YANDA"/String/readonly/False/To what standard DALiuGE should filter and serialize the parameters. -# @param[in] param/config_data ConfigData/""/String/readwrite/False/Additional configuration information to be mixed in with the initial data -# @param[out] port/Config ConfigFile/File/The output configuration file -# @par EAGLE_END -class ParameterSetDROP(DataDROP): - """ - A generic configuration file template wrapper - This drop opens an (optional) file containing some initial configuration information, then - appends any additional specified parameters to it, finally serving it as a data object. - """ - - config_data = b'' - - mode = dlg_string_param('mode', None) - - @abstractmethod - def serialize_parameters(self, parameters: dict, mode): - """ - Returns a string representing a serialization of the parameters. - """ - if mode == "YANDA": - # TODO: Add more complex value checking - return "\n".join(f"{x}={y}" for x, y in parameters.items()) - # Add more formats (.ini for example) - return "\n".join(f"{x}={y}" for x, y in parameters.items()) - - @abstractmethod - def filter_parameters(self, parameters: dict, mode): - """ - Returns a dictionary of parameters, with daliuge-internal or other parameters filtered out - """ - if mode == 'YANDA': - forbidden_params = list(DEFAULT_INTERNAL_PARAMETERS) - if parameters['config_data'] == "": - forbidden_params.append('configData') - return {key: val for key, val in parameters.items() if - key not in DEFAULT_INTERNAL_PARAMETERS} - return parameters - - def initialize(self, **kwargs): - """ - TODO: Open input file - """ - self.config_data = self.serialize_parameters( - self.filter_parameters(self.parameters, self.mode), self.mode).encode('utf-8') - - def getIO(self): - return MemoryIO(io.BytesIO(self.config_data)) - - @property - def dataURL(self): - hostname = os.uname()[1] - return f"config://{hostname}/{os.getpid()}/{id(self.config_data)}" - - # Dictionary mapping 1-to-many DROPLinkType constants to the corresponding methods # used to append a a DROP into a relationship collection of another # (e.g., one uses `addConsumer` to add a DROPLinkeType.CONSUMER DROP into