Skip to content

Commit

Permalink
Merge pull request #159 from ICRAR/LIU-251-app-params
Browse files Browse the repository at this point in the history
LIU-251 Automatically Parse App Args to Params
  • Loading branch information
calgray committed May 24, 2022
2 parents 980bd2f + ee74b97 commit 046c159
Showing 1 changed file with 67 additions and 21 deletions.
88 changes: 67 additions & 21 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from abc import ABCMeta, abstractmethod, abstractproperty
import ast
import base64
import collections
from collections import OrderedDict
import contextlib
import errno
import heapq
Expand Down Expand Up @@ -373,7 +373,7 @@ def __init__(self, oid, uid, **kwargs):
# All DROPs are precious unless stated otherwise; used for replication
self._precious = self._getArg(kwargs, "precious", True)

# Useful to have access to all EAGLE parameters without a priori knowledge
# Useful to have access to all EAGLE parameters without a prior knowledge
self._parameters = dict(kwargs)
self.autofill_environment_variables()
kwargs.update(self._parameters)
Expand All @@ -384,38 +384,57 @@ def __init__(self, oid, uid, **kwargs):
) # no need to use synchronised self.status here

def _extract_attributes(self, **kwargs):
"""
Extracts component and app params then assigns them to class instance attributes.
Component params take pro
"""
def getmembers(object, predicate=None):
for cls in object.__class__.__mro__[:-1]:
for k, v in vars(cls).items():
if not predicate or predicate(v):
yield k, v

def get_param_value(attr_name, default_value):
has_component_param = attr_name in kwargs
has_app_param = 'applicationArgs' in kwargs \
and attr_name in kwargs['applicationArgs']

if has_component_param and has_app_param:
logger.warning(f"Drop has both component and app param {attr_name}. Using component param.")
if has_component_param:
param = kwargs.get(attr_name)
elif has_app_param:
param = kwargs['applicationArgs'].get(attr_name).value
else:
param = default_value
return param

# Take a class dlg defined parameter class attribute and create an instanced attribute on object
for attr_name, obj in getmembers(
for attr_name, member in getmembers(
self, lambda a: not (inspect.isfunction(a) or isinstance(a, property))
):
if isinstance(obj, dlg_float_param):
value = kwargs.get(attr_name, obj.default_value)
if isinstance(member, dlg_float_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = float(value)
elif isinstance(obj, dlg_bool_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_bool_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = bool(value)
elif isinstance(obj, dlg_int_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_int_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = int(value)
elif isinstance(obj, dlg_string_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_string_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = str(value)
elif isinstance(obj, dlg_enum_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_enum_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = obj.cls(value)
elif isinstance(obj, dlg_list_param):
value = kwargs.get(attr_name, obj.default_value)
value = member.cls(value)
elif isinstance(member, dlg_list_param):
value = get_param_value(attr_name, member.default_value)
if isinstance(value, str):
if value == "":
value = []
Expand All @@ -425,8 +444,8 @@ def getmembers(object, predicate=None):
raise Exception(
f"dlg_list_param {attr_name} is not a list. Type is {type(value)}"
)
elif isinstance(obj, dlg_dict_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_dict_param):
value = get_param_value(attr_name, member.default_value)
if isinstance(value, str):
if value == "":
value = {}
Expand All @@ -443,6 +462,9 @@ def getmembers(object, predicate=None):
setattr(self, attr_name, value)

def _getArg(self, kwargs, key, default):
"""
Pops the specified key arg from kwargs else returns the default
"""
val = default
if key in kwargs:
val = kwargs.pop(key)
Expand Down Expand Up @@ -2288,12 +2310,12 @@ def initialize(self, **kwargs):
# Input and output objects are later referenced by their *index*
# (relative to the order in which they were added to this object)
# Therefore we use an ordered dict to keep the insertion order.
self._inputs = collections.OrderedDict()
self._outputs = collections.OrderedDict()
self._inputs = OrderedDict()
self._outputs = OrderedDict()

# Same as above, only that these correspond to the 'streaming' version
# of the consumers
self._streamingInputs = collections.OrderedDict()
self._streamingInputs = OrderedDict()

# An AppDROP has a second, separate state machine indicating its
# execution status.
Expand Down Expand Up @@ -2353,6 +2375,30 @@ def streamingInputs(self) -> List[DataDROP]:
"""
return list(self._streamingInputs.values())

def _generateNamedInputs(self):
"""
Generates a named mapping of input data drops. Can only be called during run().
"""
named_inputs: OrderedDict[str, DataDROP] = OrderedDict()
if 'inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict):
for i in range(len(self._inputs)):
key = list(self.parameters['inputs'][i].values())[0]
value = self._inputs[list(self.parameters['inputs'][i].keys())[0]]
named_inputs[key] = value
return named_inputs

def _generateNamedOutputs(self):
"""
Generates a named mapping of output data drops. Can only be called during run().
"""
named_outputs: OrderedDict[str, DataDROP] = OrderedDict()
if 'outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict):
for i in range(len(self._outputs)):
key = list(self.parameters['outputs'][i].values())[0]
value = self._outputs[list(self.parameters['outputs'][i].keys())[0]]
named_outputs[key] = value
return named_outputs

def handleEvent(self, e):
"""
Handles the arrival of a new event. Events are delivered from those
Expand Down

0 comments on commit 046c159

Please sign in to comment.