diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 5b71f064a..5e0d738e6 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -27,7 +27,7 @@ from abc import ABCMeta, abstractmethod, abstractproperty import ast import base64 -import collections +from collections import OrderedDict import contextlib import errno import heapq @@ -373,7 +373,7 @@ def __init__(self, oid, uid, **kwargs): # All DROPs are precious unless stated otherwise; used for replication self._precious = self._getArg(kwargs, "precious", True) - # Useful to have access to all EAGLE parameters without a priori knowledge + # Useful to have access to all EAGLE parameters without a prior knowledge self._parameters = dict(kwargs) self.autofill_environment_variables() kwargs.update(self._parameters) @@ -384,38 +384,57 @@ def __init__(self, oid, uid, **kwargs): ) # no need to use synchronised self.status here def _extract_attributes(self, **kwargs): + """ + Extracts component and app params then assigns them to class instance attributes. + Component params take pro + """ def getmembers(object, predicate=None): for cls in object.__class__.__mro__[:-1]: for k, v in vars(cls).items(): if not predicate or predicate(v): yield k, v + def get_param_value(attr_name, default_value): + has_component_param = attr_name in kwargs + has_app_param = 'applicationArgs' in kwargs \ + and attr_name in kwargs['applicationArgs'] + + if has_component_param and has_app_param: + logger.warning(f"Drop has both component and app param {attr_name}. Using component param.") + if has_component_param: + param = kwargs.get(attr_name) + elif has_app_param: + param = kwargs['applicationArgs'].get(attr_name).value + else: + param = default_value + return param + # Take a class dlg defined parameter class attribute and create an instanced attribute on object - for attr_name, obj in getmembers( + for attr_name, member in getmembers( self, lambda a: not (inspect.isfunction(a) or isinstance(a, property)) ): - if isinstance(obj, dlg_float_param): - value = kwargs.get(attr_name, obj.default_value) + if isinstance(member, dlg_float_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = float(value) - elif isinstance(obj, dlg_bool_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_bool_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = bool(value) - elif isinstance(obj, dlg_int_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_int_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = int(value) - elif isinstance(obj, dlg_string_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_string_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": value = str(value) - elif isinstance(obj, dlg_enum_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_enum_param): + value = get_param_value(attr_name, member.default_value) if value is not None and value != "": - value = obj.cls(value) - elif isinstance(obj, dlg_list_param): - value = kwargs.get(attr_name, obj.default_value) + value = member.cls(value) + elif isinstance(member, dlg_list_param): + value = get_param_value(attr_name, member.default_value) if isinstance(value, str): if value == "": value = [] @@ -425,8 +444,8 @@ def getmembers(object, predicate=None): raise Exception( f"dlg_list_param {attr_name} is not a list. Type is {type(value)}" ) - elif isinstance(obj, dlg_dict_param): - value = kwargs.get(attr_name, obj.default_value) + elif isinstance(member, dlg_dict_param): + value = get_param_value(attr_name, member.default_value) if isinstance(value, str): if value == "": value = {} @@ -443,6 +462,9 @@ def getmembers(object, predicate=None): setattr(self, attr_name, value) def _getArg(self, kwargs, key, default): + """ + Pops the specified key arg from kwargs else returns the default + """ val = default if key in kwargs: val = kwargs.pop(key) @@ -2288,12 +2310,12 @@ def initialize(self, **kwargs): # Input and output objects are later referenced by their *index* # (relative to the order in which they were added to this object) # Therefore we use an ordered dict to keep the insertion order. - self._inputs = collections.OrderedDict() - self._outputs = collections.OrderedDict() + self._inputs = OrderedDict() + self._outputs = OrderedDict() # Same as above, only that these correspond to the 'streaming' version # of the consumers - self._streamingInputs = collections.OrderedDict() + self._streamingInputs = OrderedDict() # An AppDROP has a second, separate state machine indicating its # execution status. @@ -2353,6 +2375,30 @@ def streamingInputs(self) -> List[DataDROP]: """ return list(self._streamingInputs.values()) + def _generateNamedInputs(self): + """ + Generates a named mapping of input data drops. Can only be called during run(). + """ + named_inputs: OrderedDict[str, DataDROP] = OrderedDict() + if 'inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict): + for i in range(len(self._inputs)): + key = list(self.parameters['inputs'][i].values())[0] + value = self._inputs[list(self.parameters['inputs'][i].keys())[0]] + named_inputs[key] = value + return named_inputs + + def _generateNamedOutputs(self): + """ + Generates a named mapping of output data drops. Can only be called during run(). + """ + named_outputs: OrderedDict[str, DataDROP] = OrderedDict() + if 'outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict): + for i in range(len(self._outputs)): + key = list(self.parameters['outputs'][i].values())[0] + value = self._outputs[list(self.parameters['outputs'][i].keys())[0]] + named_outputs[key] = value + return named_outputs + def handleEvent(self, e): """ Handles the arrival of a new event. Events are delivered from those