Skip to content

Commit

Permalink
Merge 3a5e99c into a055143
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed May 20, 2022
2 parents a055143 + 3a5e99c commit 61ccfe5
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 67 deletions.
2 changes: 1 addition & 1 deletion daliuge-common/docker/Dockerfile.devcuda
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN DEBIAN_FRONTEND=noninteractive apt install -y wget gnupg2 software-propertie
RUN mkdir -p /code && cd /code &&\
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-ubuntu2004.pin &&\
mv cuda-ubuntu2004.pin /etc/apt/preferences.d/cuda-repository-pin-600 &&\
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/7fa2af80.pub &&\
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/3bf863cc.pub &&\
add-apt-repository "deb https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/ /" &&\
apt update

Expand Down
63 changes: 41 additions & 22 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ast
import base64
import collections
from enum import Enum
import importlib
import inspect
import logging
Expand All @@ -40,6 +41,7 @@
from dlg.meta import (
dlg_bool_param,
dlg_string_param,
dlg_enum_param,
dlg_float_param,
dlg_dict_param,
dlg_component,
Expand Down Expand Up @@ -104,13 +106,21 @@ def import_using_name(app, fname):
)
except AttributeError:
raise InvalidDropException(app, "Module %s has no member %s" % (modname, fname))



def import_using_code(code):
return dill.loads(code)


class DropParser(Enum):
PICKLE = 'pickle'
EVAL = 'eval'
PATH = 'path'
DATAURL = 'dataurl'
NPY = 'npy'
#JSON = "json"

##
# @brief PyFuncApp
# @details An application that wraps a simple python function.
Expand Down Expand Up @@ -138,8 +148,10 @@ def import_using_code(code):
# \~English Python function name
# @param[in] aparam/func_code Function Code//String/readwrite/False//False/
# \~English Python function code, e.g. 'def function_name(args): return args'
# @param[in] aparam/pickle Pickle/false/Boolean/readwrite/False//False/
# \~English Whether the python arguments are pickled.
# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# \~English Input port parsing technique
# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# \~English output port parsing technique
# @param[in] aparam/func_defaults Function Defaults//String/readwrite/False//False/
# \~English Mapping from argname to default value. Should match only the last part of the argnames list.
# Values are interpreted as Python code literals and that means string values need to be quoted.
Expand Down Expand Up @@ -193,15 +205,11 @@ class PyFuncApp(BarrierAppDROP):
)

func_name = dlg_string_param("func_name", None)

# func_code = dlg_bytes_param("func_code", None) # bytes or base64 string

pickle = dlg_bool_param("pickle", True)

input_parser: DropParser = dlg_enum_param(DropParser, "input_parser", DropParser.PICKLE) # type: ignore
output_parser: DropParser = dlg_enum_param(DropParser, "output_parser", DropParser.PICKLE) # type: ignore
func_arg_mapping = dlg_dict_param("func_arg_mapping", {})

func_defaults = dlg_dict_param("func_defaults", {})

f: Callable
fdefaults: dict

Expand Down Expand Up @@ -235,7 +243,7 @@ def _init_func_defaults(self):
+ "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}"
)
raise ValueError
if self.pickle:
if DropParser(self.input_parser) is DropParser.PICKLE:
# only values are pickled, get them unpickled
for name, value in self.func_defaults.items():
self.func_defaults[name] = deserialize_data(value)
Expand Down Expand Up @@ -284,7 +292,8 @@ def initialize(self, **kwargs):
"func_code",
"func_name",
"func_arg_mapping",
"pickle",
"input_parser",
"output_parser",
"func_defaults"
]
for kw in self.func_def_keywords:
Expand Down Expand Up @@ -363,7 +372,7 @@ def run(self):
Function arguments in Python can be passed as positional, kw-value, positional
only, kw-value only, and catch-all args and kwargs, which don't provide any
hint about the names of accepted parameters. All of them are now supported. If
positional arguments or kw-value arguments are provided by the user, but are
positional arguments or kw-value arguments are provided by the user, but are
not explicitely defined in the function signiture AND args and/or kwargs are
allowed then these arguments are passed to the function. For args this is
somewhat risky, since the order is relevant and in this code derived from the
Expand All @@ -381,12 +390,16 @@ def run(self):

# Inputs are un-pickled and treated as the arguments of the function
# Their order must be preserved, so we use an OrderedDict
if self.pickle:
if DropParser(self.input_parser) is DropParser.PICKLE:
all_contents = lambda x: pickle.loads(x)
elif DropParser(self.input_parser) is DropParser.EVAL:
all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8'))
elif DropParser(self.input_parser) is DropParser.PATH:
all_contents = lambda x: x.path
elif DropParser(self.input_parser) is DropParser.DATAURL:
all_contents = lambda x: x.dataurl
else:
all_contents = lambda x: ast.literal_eval(
droputils.allDropContents(x).decode("utf-8")
)
raise ValueError(self.input_parser.__repr__())

inputs = collections.OrderedDict()
for uid, drop in self._inputs.items():
Expand Down Expand Up @@ -455,15 +468,20 @@ def run(self):
if ptype in ["Complex", "Json"]:
try:
value = ast.literal_eval(value)
except ValueError:
pass
elif ptype in ["Python"]:
try:
value = eval(value)
except:
pass
pargsDict.update({
pa:
value
})
elif pa != 'self' and pa not in pargsDict:
elif pa != 'self':
logger.warning(f"Required positional argument '{pa}' not found!")
logger.debug(f"updating posargs with {list(pargsDict.values())}")
logger.debug(f"updating posargs with {list(kwargs.values())}")
self.pargs.extend(list(pargsDict.values()))

# Try to get values for still missing kwargs arguments from Application kws
Expand Down Expand Up @@ -536,12 +554,13 @@ def write_results(self, result):
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
p = pickle.dumps(r)
if self.pickle:
if DropParser(self.output_parser) is DropParser.PICKLE:
logger.debug(f"Writing pickeled result {type(r)} to {o}")
o.write(pickle.dumps(r)) # @UndefinedVariable
o.write(pickle.dumps(r))
elif DropParser(self.output_parser) is DropParser.EVAL:
o.write(repr(r).encode('utf-8'))
else:
o.write(repr(r).encode("utf-8"))
ValueError(self.output_parser.__repr__())

def generate_recompute_data(self):
return self._recompute_data
91 changes: 69 additions & 22 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from abc import ABCMeta, abstractmethod, abstractproperty
import ast
import base64
import collections
from collections import OrderedDict
import contextlib
import errno
import heapq
Expand Down Expand Up @@ -111,6 +111,7 @@
dlg_int_param,
dlg_list_param,
dlg_string_param,
dlg_enum_param,
dlg_bool_param,
dlg_dict_param,
)
Expand Down Expand Up @@ -209,6 +210,8 @@ def __init__(self, oid, uid, **kwargs):

super(AbstractDROP, self).__init__()

# Useful to have access to all EAGLE parameters without a priori knowledge
self._parameters = dict(kwargs)
self._extract_attributes(**kwargs)

# Copy it since we're going to modify it
Expand Down Expand Up @@ -372,8 +375,6 @@ def __init__(self, oid, uid, **kwargs):
# All DROPs are precious unless stated otherwise; used for replication
self._precious = self._getArg(kwargs, "precious", True)

# Useful to have access to all EAGLE parameters without a priori knowledge
self._parameters = dict(kwargs)
self.autofill_environment_variables()
kwargs.update(self._parameters)
# Sub-class initialization; mark ourselves as INITIALIZED after that
Expand All @@ -383,47 +384,69 @@ def __init__(self, oid, uid, **kwargs):
) # no need to use synchronised self.status here

def _extract_attributes(self, **kwargs):
"""
Extracts component and app params then assigns them to class instance attributes.
Component params take pro
"""
def getmembers(object, predicate=None):
for cls in object.__class__.__mro__[:-1]:
for k, v in vars(cls).items():
if not predicate or predicate(v):
yield k, v

def get_param_value(attr_name, default_value):
has_component_param = attr_name in kwargs
has_app_param = hasattr(self, 'parameters') \
and 'applicationArgs' in self.parameters \
and attr_name in self.parameters['applicationArgs']

if has_component_param and has_app_param:
logger.warning(f"Drop has both component and app param {attr_name}. Using component param.")
if has_component_param:
param = kwargs.get(attr_name)
elif has_app_param:
param = self.parameters['applicationArgs'].get(attr_name).value
else:
param = default_value
return param

# Take a class dlg defined parameter class attribute and create an instanced attribute on object
for attr_name, obj in getmembers(
for attr_name, member in getmembers(
self, lambda a: not (inspect.isfunction(a) or isinstance(a, property))
):
if isinstance(obj, dlg_float_param):
value = kwargs.get(attr_name, obj.default_value)
if isinstance(member, dlg_float_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = float(value)
elif isinstance(obj, dlg_bool_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_bool_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = bool(value)
elif isinstance(obj, dlg_int_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_int_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = int(value)
elif isinstance(obj, dlg_string_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_string_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = str(value)
elif isinstance(obj, dlg_list_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_enum_param):
value = get_param_value(attr_name, member.default_value)
if value is not None and value != "":
value = member.cls(value)
elif isinstance(member, dlg_list_param):
value = get_param_value(attr_name, member.default_value)
if isinstance(value, str):
if value == "":
value = []
else:
value = ast.literal_eval(value)
if value is not None and not isinstance(value, list):
raise Exception(
"dlg_list_param {} is not a list. It is a {}".format(
attr_name, type(value)
)
f"dlg_list_param {attr_name} is not a list. Type is {type(value)}"
)
elif isinstance(obj, dlg_dict_param):
value = kwargs.get(attr_name, obj.default_value)
elif isinstance(member, dlg_dict_param):
value = get_param_value(attr_name, member.default_value)
if isinstance(value, str):
if value == "":
value = {}
Expand Down Expand Up @@ -2285,12 +2308,12 @@ def initialize(self, **kwargs):
# Input and output objects are later referenced by their *index*
# (relative to the order in which they were added to this object)
# Therefore we use an ordered dict to keep the insertion order.
self._inputs = collections.OrderedDict()
self._outputs = collections.OrderedDict()
self._inputs = OrderedDict()
self._outputs = OrderedDict()

# Same as above, only that these correspond to the 'streaming' version
# of the consumers
self._streamingInputs = collections.OrderedDict()
self._streamingInputs = OrderedDict()

# An AppDROP has a second, separate state machine indicating its
# execution status.
Expand Down Expand Up @@ -2350,6 +2373,30 @@ def streamingInputs(self) -> List[DataDROP]:
"""
return list(self._streamingInputs.values())

def _generateNamedInputs(self):
"""
Generates a named mapping of input data drops. Can only be called during run().
"""
named_inputs: OrderedDict[str, DataDROP] = OrderedDict()
if 'inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict):
for i in range(len(self._inputs)):
key = list(self.parameters['inputs'][i].values())[0]
value = self._inputs[list(self.parameters['inputs'][i].keys())[0]]
named_inputs[key] = value
return named_inputs

def _generateNamedOutputs(self):
"""
Generates a named mapping of output data drops. Can only be called during run().
"""
named_outputs: OrderedDict[str, DataDROP] = OrderedDict()
if 'outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict):
for i in range(len(self._outputs)):
key = list(self.parameters['outputs'][i].values())[0]
value = self._outputs[list(self.parameters['outputs'][i].keys())[0]]
named_outputs[key] = value
return named_outputs

def handleEvent(self, e):
"""
Handles the arrival of a new event. Events are delivered from those
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
dlg_string_param = collections.namedtuple(
"dlg_string_param", "description default_value"
)
dlg_enum_param = collections.namedtuple("dlg_enum_param", "cls description default_value")
dlg_list_param = collections.namedtuple("dlg_list_param", "description default_value")
dlg_dict_param = collections.namedtuple("dlg_dict_param", "description default_value")

Expand Down
20 changes: 15 additions & 5 deletions daliuge-engine/docker/Dockerfile.devall
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,20 @@ ENV DLG_ROOT="/tmp/dlg"

RUN apt install -y git python3-dev

RUN pip install git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git
RUN pip install dlg-nifty-components>=1.1.0

RUN pip install 'ska-sdp-realtime-receive-modules[plasma]==2.0.1' 'ska-sdp-cbf-emulator==2.0.1' --extra-index-url=https://artefact.skao.int/repository/pypi-internal/simple
RUN pip install dlg-casacore-components>=0.3.0
# Nifty
#RUN pip install --prefix=$PYTHON_PREFIX git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git
#RUN pip install --prefix=$PYTHON_PREFIX dlg-nifty-components

# Casacore + SDP
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-dal-schemas
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-core[plasma]
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-modules[plasma]
#RUN pip install dlg-casacore-components

# RASCIL
# RUN mkdir -p /tmp/rascil_data && cd /tmp/rascil_data &&\
# curl https://ska-telescope.gitlab.io/external/rascil/rascil_data.tgz -o rascil_data.tgz
# RUN tar zxf rascil_data.tgz -C /dlg/lib/python3.8/site-packages
# RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple rascil

CMD ["dlg", "daemon", "-vv"]
5 changes: 3 additions & 2 deletions daliuge-engine/test/apps/test_pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def _PyFuncApp(oid, uid, f, **kwargs):
func_name=fname,
func_code=fcode,
func_defaults=fdefaults,
pickle=True,
**kwargs,
input_parser=pyfunc.DropParser.PICKLE,
output_parser=pyfunc.DropParser.PICKLE,
**kwargs
)


Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_sleepapp(self):
b.addInput(a)
b.addOutput(c)

a = NullDROP("a", "a")
self._test_graph_runs((a, b, c), a, c)

def _test_copyapp_simple(self, app):

Expand Down
Loading

0 comments on commit 61ccfe5

Please sign in to comment.