From 74ca1ddd0c1057b2a0fc51487969747d851eb273 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 20 May 2022 14:20:36 +0800 Subject: [PATCH 1/5] automatic app param reading # Conflicts: # daliuge-engine/dlg/drop.py --- daliuge-engine/dlg/drop.py | 84 +++++++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 5b71f064a..60da351cd 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -27,7 +27,7 @@ from abc import ABCMeta, abstractmethod, abstractproperty import ast import base64 -import collections +from collections import OrderedDict import contextlib import errno import heapq @@ -210,6 +210,8 @@ def __init__(self, oid, uid, **kwargs): super(AbstractDROP, self).__init__() + # Useful to have access to all EAGLE parameters without a priori knowledge + self._parameters = dict(kwargs) self._extract_attributes(**kwargs) # Copy it since we're going to modify it @@ -373,8 +375,6 @@ def __init__(self, oid, uid, **kwargs): # All DROPs are precious unless stated otherwise; used for replication self._precious = self._getArg(kwargs, "precious", True) - # Useful to have access to all EAGLE parameters without a priori knowledge - self._parameters = dict(kwargs) self.autofill_environment_variables() kwargs.update(self._parameters) # Sub-class initialization; mark ourselves as INITIALIZED after that @@ -384,38 +384,58 @@ def __init__(self, oid, uid, **kwargs): ) # no need to use synchronised self.status here def _extract_attributes(self, **kwargs): + """ + Extracts component and app params then assigns them to class instance attributes. + Component params take pro + """ def getmembers(object, predicate=None): for cls in object.__class__.__mro__[:-1]: for k, v in vars(cls).items(): if not predicate or predicate(v): yield k, v + def get_param_value(attr_name, default_value): + has_component_param = attr_name in kwargs + has_app_param = hasattr(self, 'parameters') \ + and 'applicationArgs' in self.parameters \ + and attr_name in self.parameters['applicationArgs'] + + if has_component_param and has_app_param: + logger.warning(f"Drop has both component and app param {attr_name}. Using component param.") + if has_component_param: + param = kwargs.get(attr_name) + elif has_app_param: + param = self.parameters['applicationArgs'].get(attr_name).value + else: + param = default_value + return param + # Take a class dlg defined parameter class attribute and create an instanced attribute on object - for attr_name, obj in getmembers( + for attr_name, member in getmembers( self, lambda a: not (inspect.isfunction(a) or isinstance(a, property)) ): - if isinstance(obj, dlg_float_param): - value = kwargs.get(attr_name, obj.default_value) + if isinstance(member, dlg_float_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = float(value) - elif isinstance(obj, dlg_bool_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_bool_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = bool(value) - elif isinstance(obj, dlg_int_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_int_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = int(value) - elif isinstance(obj, dlg_string_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_string_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = str(value) elif isinstance(obj, dlg_enum_param): - value = kwargs.get(attr_name, obj.default_value) + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = obj.cls(value) - elif isinstance(obj, dlg_list_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_list_param): + value = get_param_value(attr_name, member.default_value) if isinstance(value, str): if value == "": value = [] @@ -425,8 +445,8 @@ def getmembers(object, predicate=None): raise Exception( f"dlg_list_param {attr_name} is not a list. Type is {type(value)}" ) - elif isinstance(obj, dlg_dict_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_dict_param): + value = get_param_value(attr_name, member.default_value) if isinstance(value, str): if value == "": value = {} @@ -2288,12 +2308,12 @@ def initialize(self, **kwargs): # Input and output objects are later referenced by their *index* # (relative to the order in which they were added to this object) # Therefore we use an ordered dict to keep the insertion order. - self._inputs = collections.OrderedDict() - self._outputs = collections.OrderedDict() + self._inputs = OrderedDict() + self._outputs = OrderedDict() # Same as above, only that these correspond to the 'streaming' version # of the consumers - self._streamingInputs = collections.OrderedDict() + self._streamingInputs = OrderedDict() # An AppDROP has a second, separate state machine indicating its # execution status. @@ -2353,6 +2373,30 @@ def streamingInputs(self) -> List[DataDROP]: """ return list(self._streamingInputs.values()) + def _generateNamedInputs(self): + """ + Generates a named mapping of input data drops. Can only be called during run(). + """ + named_inputs: OrderedDict[str, DataDROP] = OrderedDict() + if 'inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict): + for i in range(len(self._inputs)): + key = list(self.parameters['inputs'][i].values())[0] + value = self._inputs[list(self.parameters['inputs'][i].keys())[0]] + named_inputs[key] = value + return named_inputs + + def _generateNamedOutputs(self): + """ + Generates a named mapping of output data drops. Can only be called during run(). + """ + named_outputs: OrderedDict[str, DataDROP] = OrderedDict() + if 'outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict): + for i in range(len(self._outputs)): + key = list(self.parameters['outputs'][i].values())[0] + value = self._outputs[list(self.parameters['outputs'][i].keys())[0]] + named_outputs[key] = value + return named_outputs + def handleEvent(self, e): """ Handles the arrival of a new event. Events are delivered from those From 3a5e99c34398683dd60e9e1b3611bc7e095b2fdf Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 20 May 2022 14:46:01 +0800 Subject: [PATCH 2/5] merge bugfix --- daliuge-engine/dlg/drop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 60da351cd..0fadf63fc 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -430,10 +430,10 @@ def get_param_value(attr_name, default_value): value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = str(value) - elif isinstance(obj, dlg_enum_param): + elif isinstance(member, dlg_enum_param): value = get_param_value(attr_name, member.default_value) if value is not None and value != "": - value = obj.cls(value) + value = member.cls(value) elif isinstance(member, dlg_list_param): value = get_param_value(attr_name, member.default_value) if isinstance(value, str): From 17310184ece65bc08f56fc95e7ebf0da47c1cb76 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 20 May 2022 15:21:00 +0800 Subject: [PATCH 3/5] test update --- daliuge-engine/test/test_environmentvars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daliuge-engine/test/test_environmentvars.py b/daliuge-engine/test/test_environmentvars.py index 97aeb4247..159b81baf 100644 --- a/daliuge-engine/test/test_environmentvars.py +++ b/daliuge-engine/test/test_environmentvars.py @@ -66,7 +66,7 @@ def test_get_empty(self): Tests that an empty environment drop contains no environment variables. """ env_drop = create_empty_env_vars() - self.assertEqual(dict(), env_drop._variables) + self.assertEqual({'nm': 'env_vars'}, env_drop._variables) def test_get_multiple(self): """ From 696abd71af93d03f8cf5eeb90c0410d881762311 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 20 May 2022 15:28:03 +0800 Subject: [PATCH 4/5] limit eval scope --- daliuge-engine/dlg/apps/pyfunc.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 085f6452f..0deb323e9 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -472,13 +472,11 @@ def run(self): pass elif ptype in ["Python"]: try: - value = eval(value) + import numpy + value = eval(value, {"numpy": numpy}, {}) except: pass - pargsDict.update({ - pa: - value - }) + pargsDict.update({pa: value}) elif pa != 'self': logger.warning(f"Required positional argument '{pa}' not found!") logger.debug(f"updating posargs with {list(kwargs.values())}") From ee74b9737c55b70385c086273bcf9ab5ff30dbd5 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Tue, 24 May 2022 12:11:16 +0800 Subject: [PATCH 5/5] repair test --- daliuge-engine/dlg/drop.py | 14 ++++++++------ daliuge-engine/test/test_environmentvars.py | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 0fadf63fc..5e0d738e6 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -210,8 +210,6 @@ def __init__(self, oid, uid, **kwargs): super(AbstractDROP, self).__init__() - # Useful to have access to all EAGLE parameters without a priori knowledge - self._parameters = dict(kwargs) self._extract_attributes(**kwargs) # Copy it since we're going to modify it @@ -375,6 +373,8 @@ def __init__(self, oid, uid, **kwargs): # All DROPs are precious unless stated otherwise; used for replication self._precious = self._getArg(kwargs, "precious", True) + # Useful to have access to all EAGLE parameters without a prior knowledge + self._parameters = dict(kwargs) self.autofill_environment_variables() kwargs.update(self._parameters) # Sub-class initialization; mark ourselves as INITIALIZED after that @@ -396,16 +396,15 @@ def getmembers(object, predicate=None): def get_param_value(attr_name, default_value): has_component_param = attr_name in kwargs - has_app_param = hasattr(self, 'parameters') \ - and 'applicationArgs' in self.parameters \ - and attr_name in self.parameters['applicationArgs'] + has_app_param = 'applicationArgs' in kwargs \ + and attr_name in kwargs['applicationArgs'] if has_component_param and has_app_param: logger.warning(f"Drop has both component and app param {attr_name}. Using component param.") if has_component_param: param = kwargs.get(attr_name) elif has_app_param: - param = self.parameters['applicationArgs'].get(attr_name).value + param = kwargs['applicationArgs'].get(attr_name).value else: param = default_value return param @@ -463,6 +462,9 @@ def get_param_value(attr_name, default_value): setattr(self, attr_name, value) def _getArg(self, kwargs, key, default): + """ + Pops the specified key arg from kwargs else returns the default + """ val = default if key in kwargs: val = kwargs.pop(key) diff --git a/daliuge-engine/test/test_environmentvars.py b/daliuge-engine/test/test_environmentvars.py index 159b81baf..97aeb4247 100644 --- a/daliuge-engine/test/test_environmentvars.py +++ b/daliuge-engine/test/test_environmentvars.py @@ -66,7 +66,7 @@ def test_get_empty(self): Tests that an empty environment drop contains no environment variables. """ env_drop = create_empty_env_vars() - self.assertEqual({'nm': 'env_vars'}, env_drop._variables) + self.assertEqual(dict(), env_drop._variables) def test_get_multiple(self): """