diff --git a/daliuge-common/docker/Dockerfile.devcuda b/daliuge-common/docker/Dockerfile.devcuda index 103b6c598..dd26aaf1c 100644 --- a/daliuge-common/docker/Dockerfile.devcuda +++ b/daliuge-common/docker/Dockerfile.devcuda @@ -22,7 +22,7 @@ RUN DEBIAN_FRONTEND=noninteractive apt install -y wget gnupg2 software-propertie RUN mkdir -p /code && cd /code &&\ wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-ubuntu2004.pin &&\ mv cuda-ubuntu2004.pin /etc/apt/preferences.d/cuda-repository-pin-600 &&\ - apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/7fa2af80.pub &&\ + apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/3bf863cc.pub &&\ add-apt-repository "deb https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/ /" &&\ apt update diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index c3b9da275..085f6452f 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -24,6 +24,7 @@ import ast import base64 import collections +from enum import Enum import importlib import inspect import logging @@ -40,6 +41,7 @@ from dlg.meta import ( dlg_bool_param, dlg_string_param, + dlg_enum_param, dlg_float_param, dlg_dict_param, dlg_component, @@ -104,13 +106,21 @@ def import_using_name(app, fname): ) except AttributeError: raise InvalidDropException(app, "Module %s has no member %s" % (modname, fname)) - + def import_using_code(code): return dill.loads(code) +class DropParser(Enum): + PICKLE = 'pickle' + EVAL = 'eval' + PATH = 'path' + DATAURL = 'dataurl' + NPY = 'npy' + #JSON = "json" + ## # @brief PyFuncApp # @details An application that wraps a simple python function. @@ -138,8 +148,10 @@ def import_using_code(code): # \~English Python function name # @param[in] aparam/func_code Function Code//String/readwrite/False//False/ # \~English Python function code, e.g. 'def function_name(args): return args' -# @param[in] aparam/pickle Pickle/false/Boolean/readwrite/False//False/ -# \~English Whether the python arguments are pickled. +# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/ +# \~English Input port parsing technique +# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/ +# \~English output port parsing technique # @param[in] aparam/func_defaults Function Defaults//String/readwrite/False//False/ # \~English Mapping from argname to default value. Should match only the last part of the argnames list. # Values are interpreted as Python code literals and that means string values need to be quoted. @@ -193,15 +205,11 @@ class PyFuncApp(BarrierAppDROP): ) func_name = dlg_string_param("func_name", None) - # func_code = dlg_bytes_param("func_code", None) # bytes or base64 string - - pickle = dlg_bool_param("pickle", True) - + input_parser: DropParser = dlg_enum_param(DropParser, "input_parser", DropParser.PICKLE) # type: ignore + output_parser: DropParser = dlg_enum_param(DropParser, "output_parser", DropParser.PICKLE) # type: ignore func_arg_mapping = dlg_dict_param("func_arg_mapping", {}) - func_defaults = dlg_dict_param("func_defaults", {}) - f: Callable fdefaults: dict @@ -235,7 +243,7 @@ def _init_func_defaults(self): + "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}" ) raise ValueError - if self.pickle: + if DropParser(self.input_parser) is DropParser.PICKLE: # only values are pickled, get them unpickled for name, value in self.func_defaults.items(): self.func_defaults[name] = deserialize_data(value) @@ -284,7 +292,8 @@ def initialize(self, **kwargs): "func_code", "func_name", "func_arg_mapping", - "pickle", + "input_parser", + "output_parser", "func_defaults" ] for kw in self.func_def_keywords: @@ -363,7 +372,7 @@ def run(self): Function arguments in Python can be passed as positional, kw-value, positional only, kw-value only, and catch-all args and kwargs, which don't provide any hint about the names of accepted parameters. All of them are now supported. If - positional arguments or kw-value arguments are provided by the user, but are + positional arguments or kw-value arguments are provided by the user, but are not explicitely defined in the function signiture AND args and/or kwargs are allowed then these arguments are passed to the function. For args this is somewhat risky, since the order is relevant and in this code derived from the @@ -381,12 +390,16 @@ def run(self): # Inputs are un-pickled and treated as the arguments of the function # Their order must be preserved, so we use an OrderedDict - if self.pickle: + if DropParser(self.input_parser) is DropParser.PICKLE: all_contents = lambda x: pickle.loads(x) + elif DropParser(self.input_parser) is DropParser.EVAL: + all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8')) + elif DropParser(self.input_parser) is DropParser.PATH: + all_contents = lambda x: x.path + elif DropParser(self.input_parser) is DropParser.DATAURL: + all_contents = lambda x: x.dataurl else: - all_contents = lambda x: ast.literal_eval( - droputils.allDropContents(x).decode("utf-8") - ) + raise ValueError(self.input_parser.__repr__()) inputs = collections.OrderedDict() for uid, drop in self._inputs.items(): @@ -455,15 +468,20 @@ def run(self): if ptype in ["Complex", "Json"]: try: value = ast.literal_eval(value) + except ValueError: + pass + elif ptype in ["Python"]: + try: + value = eval(value) except: pass pargsDict.update({ pa: value }) - elif pa != 'self' and pa not in pargsDict: + elif pa != 'self': logger.warning(f"Required positional argument '{pa}' not found!") - logger.debug(f"updating posargs with {list(pargsDict.values())}") + logger.debug(f"updating posargs with {list(kwargs.values())}") self.pargs.extend(list(pargsDict.values())) # Try to get values for still missing kwargs arguments from Application kws @@ -536,12 +554,13 @@ def write_results(self, result): if len(outputs) == 1: result = [result] for r, o in zip(result, outputs): - p = pickle.dumps(r) - if self.pickle: + if DropParser(self.output_parser) is DropParser.PICKLE: logger.debug(f"Writing pickeled result {type(r)} to {o}") - o.write(pickle.dumps(r)) # @UndefinedVariable + o.write(pickle.dumps(r)) + elif DropParser(self.output_parser) is DropParser.EVAL: + o.write(repr(r).encode('utf-8')) else: - o.write(repr(r).encode("utf-8")) + ValueError(self.output_parser.__repr__()) def generate_recompute_data(self): return self._recompute_data diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 949194e7e..0fadf63fc 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -27,7 +27,7 @@ from abc import ABCMeta, abstractmethod, abstractproperty import ast import base64 -import collections +from collections import OrderedDict import contextlib import errno import heapq @@ -111,6 +111,7 @@ dlg_int_param, dlg_list_param, dlg_string_param, + dlg_enum_param, dlg_bool_param, dlg_dict_param, ) @@ -209,6 +210,8 @@ def __init__(self, oid, uid, **kwargs): super(AbstractDROP, self).__init__() + # Useful to have access to all EAGLE parameters without a priori knowledge + self._parameters = dict(kwargs) self._extract_attributes(**kwargs) # Copy it since we're going to modify it @@ -372,8 +375,6 @@ def __init__(self, oid, uid, **kwargs): # All DROPs are precious unless stated otherwise; used for replication self._precious = self._getArg(kwargs, "precious", True) - # Useful to have access to all EAGLE parameters without a priori knowledge - self._parameters = dict(kwargs) self.autofill_environment_variables() kwargs.update(self._parameters) # Sub-class initialization; mark ourselves as INITIALIZED after that @@ -383,34 +384,58 @@ def __init__(self, oid, uid, **kwargs): ) # no need to use synchronised self.status here def _extract_attributes(self, **kwargs): + """ + Extracts component and app params then assigns them to class instance attributes. + Component params take pro + """ def getmembers(object, predicate=None): for cls in object.__class__.__mro__[:-1]: for k, v in vars(cls).items(): if not predicate or predicate(v): yield k, v + def get_param_value(attr_name, default_value): + has_component_param = attr_name in kwargs + has_app_param = hasattr(self, 'parameters') \ + and 'applicationArgs' in self.parameters \ + and attr_name in self.parameters['applicationArgs'] + + if has_component_param and has_app_param: + logger.warning(f"Drop has both component and app param {attr_name}. Using component param.") + if has_component_param: + param = kwargs.get(attr_name) + elif has_app_param: + param = self.parameters['applicationArgs'].get(attr_name).value + else: + param = default_value + return param + # Take a class dlg defined parameter class attribute and create an instanced attribute on object - for attr_name, obj in getmembers( + for attr_name, member in getmembers( self, lambda a: not (inspect.isfunction(a) or isinstance(a, property)) ): - if isinstance(obj, dlg_float_param): - value = kwargs.get(attr_name, obj.default_value) + if isinstance(member, dlg_float_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = float(value) - elif isinstance(obj, dlg_bool_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_bool_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = bool(value) - elif isinstance(obj, dlg_int_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_int_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = int(value) - elif isinstance(obj, dlg_string_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_string_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = str(value) - elif isinstance(obj, dlg_list_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_enum_param): + value = get_param_value(attr_name, member.default_value) + if value is not None and value != "": + value = member.cls(value) + elif isinstance(member, dlg_list_param): + value = get_param_value(attr_name, member.default_value) if isinstance(value, str): if value == "": value = [] @@ -418,12 +443,10 @@ def getmembers(object, predicate=None): value = ast.literal_eval(value) if value is not None and not isinstance(value, list): raise Exception( - "dlg_list_param {} is not a list. It is a {}".format( - attr_name, type(value) - ) + f"dlg_list_param {attr_name} is not a list. Type is {type(value)}" ) - elif isinstance(obj, dlg_dict_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_dict_param): + value = get_param_value(attr_name, member.default_value) if isinstance(value, str): if value == "": value = {} @@ -2285,12 +2308,12 @@ def initialize(self, **kwargs): # Input and output objects are later referenced by their *index* # (relative to the order in which they were added to this object) # Therefore we use an ordered dict to keep the insertion order. - self._inputs = collections.OrderedDict() - self._outputs = collections.OrderedDict() + self._inputs = OrderedDict() + self._outputs = OrderedDict() # Same as above, only that these correspond to the 'streaming' version # of the consumers - self._streamingInputs = collections.OrderedDict() + self._streamingInputs = OrderedDict() # An AppDROP has a second, separate state machine indicating its # execution status. @@ -2350,6 +2373,30 @@ def streamingInputs(self) -> List[DataDROP]: """ return list(self._streamingInputs.values()) + def _generateNamedInputs(self): + """ + Generates a named mapping of input data drops. Can only be called during run(). + """ + named_inputs: OrderedDict[str, DataDROP] = OrderedDict() + if 'inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict): + for i in range(len(self._inputs)): + key = list(self.parameters['inputs'][i].values())[0] + value = self._inputs[list(self.parameters['inputs'][i].keys())[0]] + named_inputs[key] = value + return named_inputs + + def _generateNamedOutputs(self): + """ + Generates a named mapping of output data drops. Can only be called during run(). + """ + named_outputs: OrderedDict[str, DataDROP] = OrderedDict() + if 'outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict): + for i in range(len(self._outputs)): + key = list(self.parameters['outputs'][i].values())[0] + value = self._outputs[list(self.parameters['outputs'][i].keys())[0]] + named_outputs[key] = value + return named_outputs + def handleEvent(self, e): """ Handles the arrival of a new event. Events are delivered from those diff --git a/daliuge-engine/dlg/meta.py b/daliuge-engine/dlg/meta.py index ed2bf13b4..2e88bd0b4 100644 --- a/daliuge-engine/dlg/meta.py +++ b/daliuge-engine/dlg/meta.py @@ -28,6 +28,7 @@ dlg_string_param = collections.namedtuple( "dlg_string_param", "description default_value" ) +dlg_enum_param = collections.namedtuple("dlg_enum_param", "cls description default_value") dlg_list_param = collections.namedtuple("dlg_list_param", "description default_value") dlg_dict_param = collections.namedtuple("dlg_dict_param", "description default_value") diff --git a/daliuge-engine/docker/Dockerfile.devall b/daliuge-engine/docker/Dockerfile.devall index a3efa6c07..3a98e586b 100644 --- a/daliuge-engine/docker/Dockerfile.devall +++ b/daliuge-engine/docker/Dockerfile.devall @@ -26,10 +26,20 @@ ENV DLG_ROOT="/tmp/dlg" RUN apt install -y git python3-dev -RUN pip install git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git -RUN pip install dlg-nifty-components>=1.1.0 - -RUN pip install 'ska-sdp-realtime-receive-modules[plasma]==2.0.1' 'ska-sdp-cbf-emulator==2.0.1' --extra-index-url=https://artefact.skao.int/repository/pypi-internal/simple -RUN pip install dlg-casacore-components>=0.3.0 +# Nifty +#RUN pip install --prefix=$PYTHON_PREFIX git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git +#RUN pip install --prefix=$PYTHON_PREFIX dlg-nifty-components + +# Casacore + SDP +#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-dal-schemas +#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-core[plasma] +#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-modules[plasma] +#RUN pip install dlg-casacore-components + +# RASCIL +# RUN mkdir -p /tmp/rascil_data && cd /tmp/rascil_data &&\ +# curl https://ska-telescope.gitlab.io/external/rascil/rascil_data.tgz -o rascil_data.tgz +# RUN tar zxf rascil_data.tgz -C /dlg/lib/python3.8/site-packages +# RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple rascil CMD ["dlg", "daemon", "-vv"] \ No newline at end of file diff --git a/daliuge-engine/test/apps/test_pyfunc.py b/daliuge-engine/test/apps/test_pyfunc.py index 8eb15dac7..4f7448e43 100644 --- a/daliuge-engine/test/apps/test_pyfunc.py +++ b/daliuge-engine/test/apps/test_pyfunc.py @@ -79,8 +79,9 @@ def _PyFuncApp(oid, uid, f, **kwargs): func_name=fname, func_code=fcode, func_defaults=fdefaults, - pickle=True, - **kwargs, + input_parser=pyfunc.DropParser.PICKLE, + output_parser=pyfunc.DropParser.PICKLE, + **kwargs ) diff --git a/daliuge-engine/test/apps/test_simple.py b/daliuge-engine/test/apps/test_simple.py index e34df044e..1296a6dad 100644 --- a/daliuge-engine/test/apps/test_simple.py +++ b/daliuge-engine/test/apps/test_simple.py @@ -68,7 +68,7 @@ def test_sleepapp(self): b.addInput(a) b.addOutput(c) - a = NullDROP("a", "a") + self._test_graph_runs((a, b, c), a, c) def _test_copyapp_simple(self, app): diff --git a/daliuge-engine/test/graphs/funcTestPG_namedPorts.graph b/daliuge-engine/test/graphs/funcTestPG_namedPorts.graph index 6c08e226f..93c9db0e0 100644 --- a/daliuge-engine/test/graphs/funcTestPG_namedPorts.graph +++ b/daliuge-engine/test/graphs/funcTestPG_namedPorts.graph @@ -37,15 +37,38 @@ "options": [], "positional": false }, - "pickle": { - "text": "Pickle", - "value": false, - "defaultValue": "", - "description": "Whether the python arguments are pickled.", + "input_parser": { + "text": "Input Parser", + "value": "eval", + "defaultValue": "pickle", + "description": "Input port parsing technique", "readonly": false, - "type": "Boolean", + "type": "Select", "precious": false, - "options": [], + "options": [ + "pickle", + "eval", + "path", + "dataurl", + "npy" + ], + "positional": false + }, + "output_parser": { + "text": "Output Parser", + "value": "eval", + "defaultValue": "pickle", + "description": "Output port parsing technique", + "readonly": false, + "type": "Select", + "precious": false, + "options": [ + "pickle", + "eval", + "path", + "dataurl", + "npy" + ], "positional": false }, "func_defaults": { diff --git a/daliuge-engine/test/graphs/pyfunc_glob_testPG.graph b/daliuge-engine/test/graphs/pyfunc_glob_testPG.graph index 7813e476f..788e6d234 100644 --- a/daliuge-engine/test/graphs/pyfunc_glob_testPG.graph +++ b/daliuge-engine/test/graphs/pyfunc_glob_testPG.graph @@ -37,15 +37,38 @@ "options": [], "positional": false }, - "pickle": { - "text": "Pickle", - "value": false, - "defaultValue": "", - "description": "Whether the python arguments are pickled.", + "input_parser": { + "text": "Input Parser", + "value": "eval", + "defaultValue": "pickle", + "description": "Input port parsing technique", "readonly": false, - "type": "Boolean", + "type": "Select", "precious": false, - "options": [], + "options": [ + "pickle", + "eval", + "path", + "dataurl", + "npy" + ], + "positional": false + }, + "output_parser": { + "text": "Output Parser", + "value": "eval", + "defaultValue": "pickle", + "description": "Output port parsing technique", + "readonly": false, + "type": "Select", + "precious": false, + "options": [ + "pickle", + "eval", + "path", + "dataurl", + "npy" + ], "positional": false }, "func_defaults": {