diff --git a/daliuge-engine/dlg/apps/archiving.py b/daliuge-engine/dlg/apps/archiving.py index 115e52be7..99420052f 100644 --- a/daliuge-engine/dlg/apps/archiving.py +++ b/daliuge-engine/dlg/apps/archiving.py @@ -47,7 +47,7 @@ class ExternalStoreApp(BarrierAppDROP): where it resides. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "ExternalStoreApp", "An application that takes its input DROP (which must be one, and only one) " "and creates a copy of it in a completely external store, from the point " @@ -106,7 +106,7 @@ class NgasArchivingApp(ExternalStoreApp): supported by the framework, and not only filesystem objects. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "NgasArchivingApp", "An ExternalStoreApp class that takes its input DROP and archives it in " "an NGAS server. It currently deals with non-container DROPs only.", diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index 0970b511b..b28bc3da4 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -312,7 +312,7 @@ class BashShellApp(BashShellBase, BarrierAppDROP): StreamingOutputBashApp for those cases. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "BashShellApp", "An app that runs a bash command in batch mode", [dlg_batch_input("text/*", [])], @@ -330,7 +330,7 @@ class StreamingOutputBashApp(BashShellBase, BarrierAppDROP): next application. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "StreamingOutputBashApp", "Like BashShellApp, but its stdout is a stream " "of data that is fed into the next application.", @@ -357,7 +357,7 @@ class StreamingInputBashApp(StreamingInputBashAppBase): this application off. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "StreamingInputBashApp", "An app that runs a bash command that consumes data from stdin.", [dlg_batch_input("text/*", [])], @@ -377,7 +377,7 @@ class StreamingInputOutputBashApp(StreamingInputBashAppBase): fed into the next application. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "StreamingInputOutputBashApp", "Like StreamingInputBashApp, but its stdout is also a " "stream of data that is fed into the next application.", diff --git a/daliuge-engine/dlg/apps/crc.py b/daliuge-engine/dlg/apps/crc.py index dbf722883..12b0335f0 100644 --- a/daliuge-engine/dlg/apps/crc.py +++ b/daliuge-engine/dlg/apps/crc.py @@ -41,7 +41,7 @@ class CRCApp(BarrierAppDROP): not something really intended to be used in a production system """ - compontent_meta = dlg_component( + component_meta = dlg_component( "CRCApp", "A BarrierAppDROP that calculates the " "CRC of the single DROP it consumes", [dlg_batch_input("binary/*", [])], @@ -72,13 +72,23 @@ def run(self): outputDrop.write(str(crc).encode("utf8")) +## +# @brief CRCStreamApp +# @details Calculate CRC in the streaming mode +# i.e. A "streamingConsumer" of its predecessor in the graph +# @par EAGLE_START +# @param category PythonApp +# @param[in] param/appclass Application Class/dlg.apps.crc.CRCStreamApp/String/readonly/ +# \~English Application class +# @param[out] port/data Data/String/ +# @par EAGLE_END class CRCStreamApp(AppDROP): """ Calculate CRC in the streaming mode i.e. A "streamingConsumer" of its predecessor in the graph """ - compontent_meta = dlg_component( + component_meta = dlg_component( "CRCStreamApp", "Calculate CRC in the streaming mode.", [dlg_batch_input("binary/*", [])], diff --git a/daliuge-engine/dlg/apps/plasma.py b/daliuge-engine/dlg/apps/plasma.py index 4bf24e7d5..928267b7d 100644 --- a/daliuge-engine/dlg/apps/plasma.py +++ b/daliuge-engine/dlg/apps/plasma.py @@ -58,7 +58,7 @@ # \~English MS output file # @par EAGLE_END class MSStreamingPlasmaConsumer(AppDROP): - compontent_meta = dlg_component( + component_meta = dlg_component( "MSStreamingPlasmaConsumer", "MS Plasma Consumer", [dlg_batch_input("binary/*", [])], @@ -145,7 +145,7 @@ def dropCompleted(self, uid, drop_state): # \~English Plasma MS output # @par EAGLE_END class MSStreamingPlasmaProducer(BarrierAppDROP): - compontent_meta = dlg_component( + component_meta = dlg_component( "MSStreamingPlasmaProducer", "MS Plasma Producer", [dlg_batch_input("binary/*", [])], diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 52091a7a9..bc9a564d5 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -21,6 +21,7 @@ # """Module implementing the PyFuncApp class""" +import ast import base64 import collections import importlib @@ -28,12 +29,22 @@ import logging import pickle +from typing import Callable import dill -from .. import droputils, utils -from ..drop import BarrierAppDROP -from ..exceptions import InvalidDropException - +from dlg import droputils, utils +from dlg.drop import BarrierAppDROP +from dlg.exceptions import InvalidDropException +from dlg.meta import ( + dlg_bool_param, + dlg_string_param, + dlg_float_param, + dlg_dict_param, + dlg_component, + dlg_batch_input, + dlg_batch_output, + dlg_streaming_input, +) logger = logging.getLogger(__name__) @@ -57,7 +68,7 @@ def serialize_func(f): a = inspect.getfullargspec(f) if a.defaults: fdefaults = dict( - zip(a.args[-len(a.defaults) :], [serialize_data(d) for d in a.defaults]) + zip(a.args[-len(a.defaults):], [serialize_data(d) for d in a.defaults]) ) logger.debug("Defaults for function %r: %r", f, fdefaults) return fser, fdefaults @@ -86,6 +97,30 @@ def import_using_code(code): return dill.loads(code) +## +# @brief PyFuncApp +# @details An application that wraps a simple python function. +# The inputs of the application are treated as the arguments of the function. +# Conversely, the output of the function is treated as the output of the +# application. If the application has more than one output, the result of +# calling the function is treated as an iterable, with each individual object +# being written to its corresponding output. +# @par EAGLE_START +# @param category PythonApp +# @param[in] param/appclass Application Class/dlg.apps.pyfunc.PyFuncApp/String/readonly/ +# \~English Application class +# @param[in] param/func_name Function Name//String/readwrite/ +# \~English Python fuction name +# @param[in] param/func_code Function Code//String/readwrite/ +# \~English Python fuction code, e.g. 'def fuction_name(args): return args' +# @param[in] param/pickle Pickle//bool/readwrite/ +# \~English Whether the python arguments are pickled. +# @param[in] param/func_defaults Function Defaults//String/readwrite/ +# \~English Mapping from argname to default value. Should match only the last part +# of the argnames list +# @param[in] param/func_arg_mapping Function Arguments Mapping//String/readwrite/ +# \~English Mapping between argument name and input drop uids +# @par EAGLE_END class PyFuncApp(BarrierAppDROP): """ An application that wraps a simple python function. @@ -104,45 +139,76 @@ class PyFuncApp(BarrierAppDROP): Both inputs and outputs are serialized using the pickle protocol. """ + component_meta = dlg_component( + "PyFuncApp", + "Py Func App.", + [dlg_batch_input("binary/*", [])], + [dlg_batch_output("binary/*", [])], + [dlg_streaming_input("binary/*")], + ) + + func_name = dlg_string_param("func_name", None) + + # fcode = dlg_bytes_param("func_code", None) # bytes or base64 string + + pickle = dlg_bool_param("pickle", True) + + func_arg_mapping = dlg_dict_param("func_arg_mapping", {}) + + func_defaults = dlg_dict_param("func_defaults", {}) + + f: Callable + fdefaults: dict + def initialize(self, **kwargs): BarrierAppDROP.initialize(self, **kwargs) - self.fname = fname = self._getArg(kwargs, "func_name", None) - fcode = self._getArg(kwargs, "func_code", None) - if not fname and not fcode: + self.fcode = self._getArg(kwargs, "func_code", None) + if not self.func_name and not self.fcode: raise InvalidDropException( self, "No function specified (either via name or code)" ) - if not fcode: - self.f = import_using_name(self, fname) + # Lookup function or import bytecode as a function + if not self.fcode: + self.f = import_using_name(self, self.func_name) else: - if not isinstance(fcode, bytes): - fcode = base64.b64decode(fcode.encode("utf8")) - self.f = import_using_code(fcode) + if not isinstance(self.fcode, bytes): + self.fcode = base64.b64decode(self.fcode.encode("utf8")) + self.f = import_using_code(self.fcode) # Mapping from argname to default value. Should match only the last part # of the argnames list - fdefaults = self._getArg(kwargs, "func_defaults", {}) or {} - self.fdefaults = {name: deserialize_data(d) for name, d in fdefaults.items()} - logger.debug("Default values for function %s: %r", self.fname, self.fdefaults) + if isinstance(self.func_defaults, str): + self.func_defaults = ast.literal_eval(self.func_defaults) + + if self.pickle: + self.fdefaults = {name: deserialize_data(d) for name, d in self.func_defaults.items()} + else: + self.fdefaults = self.func_defaults + + logger.debug(f"Default values for function {self.func_name}: {self.fdefaults}") # Mapping between argument name and input drop uids - self.func_arg_mapping = self._getArg(kwargs, "func_arg_mapping", {}) - logger.debug("Input mapping: %r", self.func_arg_mapping) + logger.debug(f"Input mapping: {self.func_arg_mapping}") def run(self): # Inputs are un-pickled and treated as the arguments of the function # Their order must be preserved, so we use an OrderedDict - all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) + if self.pickle: + all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) + else: + all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8')) + inputs = collections.OrderedDict() - for uid, i in self._inputs.items(): - inputs[uid] = all_contents(i) + for uid, drop in self._inputs.items(): + inputs[uid] = all_contents(drop) # Keyword arguments are made up by the default values plus the inputs # that match one of the keyword argument names argnames = inspect.getfullargspec(self.f).args + kwargs = { name: inputs.pop(uid) for name, uid in self.func_arg_mapping.items() @@ -152,7 +218,7 @@ def run(self): # The rest of the inputs are the positional arguments args = list(inputs.values()) - logger.debug("Running %s with args=%r, kwargs=%r", self.fname, args, kwargs) + logger.debug(f"Running {self.func_name} with args={args}, kwargs={kwargs}") result = self.f(*args, **kwargs) # Depending on how many outputs we have we treat our result @@ -162,4 +228,7 @@ def run(self): if len(outputs) == 1: result = [result] for r, o in zip(result, outputs): - o.write(pickle.dumps(r)) # @UndefinedVariable + if self.pickle: + o.write(pickle.dumps(r)) # @UndefinedVariable + else: + o.write(repr(r).encode('utf-8')) diff --git a/daliuge-engine/dlg/apps/scp.py b/daliuge-engine/dlg/apps/scp.py index afd84592b..7cff42b1d 100644 --- a/daliuge-engine/dlg/apps/scp.py +++ b/daliuge-engine/dlg/apps/scp.py @@ -19,8 +19,8 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, # MA 02111-1307 USA # -from ..remote import copyTo, copyFrom -from ..drop import ( +from dlg.remote import copyTo, copyFrom +from dlg.drop import ( BarrierAppDROP, NgasDROP, InMemoryDROP, @@ -28,8 +28,9 @@ RDBMSDrop, ContainerDROP, ) -from ..meta import ( +from dlg.meta import ( dlg_string_param, + dlg_float_param, dlg_component, dlg_batch_input, dlg_batch_output, @@ -37,6 +38,25 @@ ) +## +# @brief ScpApp +# @details A BarrierAppDROP that copies the content of its single input onto its +# single output via SSH's scp protocol. +# @par EAGLE_START +# @param category PythonApp +# @param[in] param/appclass Application Class/dlg.apps.scp.ScpApp/String/readonly/ +# \~English Application class +# @param[in] param/remoteUser Remote User//String/readwrite/ +# \~English Remote user address +# @param[in] param/pkeyPath Private Key Path//String/readwrite/ +# \~English Private key path +# @param[in] param/timeout Timeout//Float/readwrite/ +# \~English Connection timeout in seconds +# @param[in] port/file File/PathBasedDrop/ +# \~English Input file path +# @param[out] port/file File/PathBasedDrop/ +# \~English Output file path +# @par EAGLE_END class ScpApp(BarrierAppDROP): """ A BarrierAppDROP that copies the content of its single input onto its @@ -52,7 +72,7 @@ class ScpApp(BarrierAppDROP): two I/O DROPs. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "ScpApp", "A BarrierAppDROP that copies the content of its single " "input onto its single output via SSHs scp protocol.", @@ -71,7 +91,7 @@ class ScpApp(BarrierAppDROP): remoteUser = dlg_string_param("remoteUser", None) pkeyPath = dlg_string_param("pkeyPath", None) - timeout = dlg_string_param("timeout", None) + timeout = dlg_float_param("timeout", None) def initialize(self, **kwargs): BarrierAppDROP.initialize(self, **kwargs) diff --git a/daliuge-engine/dlg/apps/simple.py b/daliuge-engine/dlg/apps/simple.py index 13c4cc694..940085403 100644 --- a/daliuge-engine/dlg/apps/simple.py +++ b/daliuge-engine/dlg/apps/simple.py @@ -27,18 +27,18 @@ import time import numpy as np -from .. import droputils, utils -from ..drop import BarrierAppDROP, BranchAppDrop, ContainerDROP -from ..meta import dlg_float_param, dlg_string_param -from ..meta import dlg_bool_param, dlg_int_param -from ..meta import dlg_component, dlg_batch_input -from ..meta import dlg_batch_output, dlg_streaming_input +from dlg import droputils, utils +from dlg.drop import BarrierAppDROP, BranchAppDrop, ContainerDROP +from dlg.meta import dlg_float_param, dlg_string_param +from dlg.meta import dlg_bool_param, dlg_int_param +from dlg.meta import dlg_component, dlg_batch_input +from dlg.meta import dlg_batch_output, dlg_streaming_input from dlg.apps.pyfunc import serialize_data, deserialize_data class NullBarrierApp(BarrierAppDROP): - compontent_meta = dlg_component( + component_meta = dlg_component( "NullBarrierApp", "Null Barrier.", [dlg_batch_input("binary/*", [])], @@ -67,7 +67,7 @@ def run(self): class SleepApp(BarrierAppDROP): """A BarrierAppDrop that sleeps the specified amount of time (0 by default)""" - compontent_meta = dlg_component( + component_meta = dlg_component( "SleepApp", "Sleep App.", [dlg_batch_input("binary/*", [])], @@ -102,7 +102,7 @@ class CopyApp(BarrierAppDROP): the graph. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "CopyApp", "Copy App.", [dlg_batch_input("binary/*", [])], @@ -169,7 +169,7 @@ class RandomArrayApp(BarrierAppDROP): size: int, number of array elements """ - compontent_meta = dlg_component( + component_meta = dlg_component( "RandomArrayApp", "Random Array App.", [dlg_batch_input("binary/*", [])], @@ -245,7 +245,7 @@ class AverageArraysApp(BarrierAppDROP): from numpy import mean, median - compontent_meta = dlg_component( + component_meta = dlg_component( "RandomArrayApp", "Random Array App.", [dlg_batch_input("binary/*", [])], @@ -317,7 +317,7 @@ class HelloWorldApp(BarrierAppDROP): greet: string, [World], whom to greet. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "HelloWorldApp", "Hello World App.", [dlg_batch_input("binary/*", [])], @@ -349,7 +349,7 @@ def run(self): ## # @brief UrlRetrieveApp -# @details A simple APP that retrieves the content of a URL and writes. +# @details A simple APP that retrieves the content of a URL and writes # it to all outputs. # @par EAGLE_START # @param category PythonApp @@ -368,7 +368,7 @@ class UrlRetrieveApp(BarrierAppDROP): URL: string, URL to retrieve. """ - compontent_meta = dlg_component( + component_meta = dlg_component( "UrlRetrieveApp", "URL Retrieve App", [dlg_batch_input("binary/*", [])], @@ -415,7 +415,7 @@ class GenericScatterApp(BarrierAppDROP): returns a numpy array of arrays, where the first axis is of length . """ - compontent_meta = dlg_component( + component_meta = dlg_component( "GenericScatterApp", "Scatter an array like object into numSplit parts", [dlg_batch_input("binary/*", [])], diff --git a/daliuge-engine/dlg/apps/socket_listener.py b/daliuge-engine/dlg/apps/socket_listener.py index 04cb72606..b1671b2fd 100644 --- a/daliuge-engine/dlg/apps/socket_listener.py +++ b/daliuge-engine/dlg/apps/socket_listener.py @@ -45,6 +45,29 @@ logger = logging.getLogger(__name__) +## +# @brief SocketListenerApp +# @details A BarrierAppDROP that listens on a socket for data. The server-side +# socket expects only one client, and assumes that the client will close the +# connection after all its data has been sent. +# This application expects no input DROPs, and therefore raises an +# exception whenever one is added. On the output side, one or more outputs +# can be specified with the restriction that they are not ContainerDROPs +# so data can be written into them through the framework. +# @par EAGLE_START +# @param category PythonApp +# @param[in] param/appclass Application Class/dlg.apps.socket_listener.SocketListener/String/readonly/ +# \~English Application class +# @param[in] param/host Host/127.0.0.1/String/readwrite/ +# \~English Host address +# @param[in] param/port Port/1111/Integer/readwrite/ +# \~English Host port +# @param[in] param/bufsize Buffer Size/4096/String/readwrite/ +# \~English Receive buffer size +# @param[in] param/reuseAddr Reuse Address/False/Boolean/readwrite/ +# \~English +# @param[out] port/data Data/String/ +# @par EAGLE_END class SocketListenerApp(BarrierAppDROP): """ A BarrierAppDROP that listens on a socket for data. The server-side @@ -59,7 +82,7 @@ class SocketListenerApp(BarrierAppDROP): _dryRun = False - compontent_meta = dlg_component( + component_meta = dlg_component( "SocketListenerApp", "A BarrierAppDROP that listens on a socket for data", [dlg_batch_input("binary/*", [])], diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index e8430638a..4bb0369de 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -24,6 +24,7 @@ """ from abc import ABCMeta, abstractmethod +import ast import base64 import collections import contextlib @@ -357,6 +358,8 @@ def getmembers(object, predicate=None): value = str(value) elif isinstance(obj, dlg_list_param): value = kwargs.get(attr_name, obj.default_value) + if isinstance(value, str): + value = ast.literal_eval(value) if value is not None and not isinstance(value, list): raise Exception( "dlg_list_param {} is not a list. It is a {}".format( @@ -365,6 +368,8 @@ def getmembers(object, predicate=None): ) elif isinstance(obj, dlg_dict_param): value = kwargs.get(attr_name, obj.default_value) + if isinstance(value, str): + value = ast.literal_eval(value) if value is not None and not isinstance(value, dict): raise Exception( "dlg_dict_param {} is not a dict. It is a {}".format( @@ -2058,7 +2063,7 @@ def initialize(self, **kwargs): object_id = self.uid if len(self.uid) != 20: object_id = np.random.bytes(20) - if self.object_id is None: + if not self.object_id: self.object_id = object_id def getIO(self): diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 955d42213..0b008d41c 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -120,10 +120,9 @@ def allDropContents(drop, bufsize=4096): """ buf = io.BytesIO() desc = drop.open() - read = drop.read while True: - data = read(desc, bufsize) + data = drop.read(desc, bufsize) if not data: break buf.write(data) @@ -135,22 +134,21 @@ def copyDropContents(source, target, bufsize=4096): """ Manually copies data from one DROP into another, in bufsize steps """ - logger.debug("Copying from %r to %r" % (source, target)) + logger.debug(f"Copying from {repr(source)} to {repr(target)}") desc = source.open() - read = source.read - buf = read(desc, bufsize) - logger.debug("Read %d bytes from %r" % (len(buf), source)) + buf = source.read(desc, bufsize) + logger.debug(f"Read {len(buf)} bytes from {repr(source)}") while buf: target.write(buf) - logger.debug("Wrote %d bytes to %r" % (len(buf), target)) - buf = read(desc, bufsize) - logger.debug("Read %d bytes from %r" % (len(buf), source)) + logger.debug(f"Wrote {len(buf)} bytes to {repr(target)}") + buf = source.read(desc, bufsize) + logger.debug(f"Read {len(buf)} bytes from {repr(source)}") source.close(desc) def getUpstreamObjects(drop): """ - Returns a list of all direct "upstream" DROPs for the given + Returns a list of all direct "upstream" DROPs for the given+ DROP. An DROP A is "upstream" with respect to DROP B if any of the following conditions are true: diff --git a/daliuge-engine/test/apps/dynlib_example.c b/daliuge-engine/test/apps/dynlib_example.c index ca64bebf3..9aace7a7f 100644 --- a/daliuge-engine/test/apps/dynlib_example.c +++ b/daliuge-engine/test/apps/dynlib_example.c @@ -34,7 +34,7 @@ #include "dlg_app.h" /* -compontent_meta = dlg_component('dynlib_example', 'dynlib_example for dlg tests', +component_meta = dlg_component('dynlib_example', 'dynlib_example for dlg tests', [dlg_batch_input('binary/*', [])], [dlg_batch_output('binary/*', [])], [dlg_streaming_input('binary/*')])