Skip to content

Commit

Permalink
Merge 178c206 into 1387b85
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed May 18, 2022
2 parents 1387b85 + 178c206 commit 20eae16
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 46 deletions.
2 changes: 1 addition & 1 deletion daliuge-common/docker/Dockerfile.devcuda
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN DEBIAN_FRONTEND=noninteractive apt install -y wget gnupg2 software-propertie
RUN mkdir -p /code && cd /code &&\
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-ubuntu2004.pin &&\
mv cuda-ubuntu2004.pin /etc/apt/preferences.d/cuda-repository-pin-600 &&\
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/7fa2af80.pub &&\
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/3bf863cc.pub &&\
add-apt-repository "deb https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/ /" &&\
apt update

Expand Down
59 changes: 39 additions & 20 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ast
import base64
import collections
from enum import Enum
import importlib
import inspect
import logging
Expand All @@ -40,6 +41,7 @@
from dlg.meta import (
dlg_bool_param,
dlg_string_param,
dlg_enum_param,
dlg_float_param,
dlg_dict_param,
dlg_component,
Expand Down Expand Up @@ -104,13 +106,21 @@ def import_using_name(app, fname):
)
except AttributeError:
raise InvalidDropException(app, "Module %s has no member %s" % (modname, fname))



def import_using_code(code):
return dill.loads(code)


class DropParser(Enum):
PICKLE = 'pickle'
EVAL = 'evel'
PATH = 'path'
DATAURL = 'dataurl'
NPY = 'npy'
#JSON = "json"

##
# @brief PyFuncApp
# @details An application that wraps a simple python function.
Expand Down Expand Up @@ -138,8 +148,10 @@ def import_using_code(code):
# \~English Python function name
# @param[in] aparam/func_code Function Code//String/readwrite/False//False/
# \~English Python function code, e.g. 'def function_name(args): return args'
# @param[in] aparam/pickle Pickle/false/Boolean/readwrite/False//False/
# \~English Whether the python arguments are pickled.
# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# \~English Input port parsing technique
# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# \~English output port parsing technique
# @param[in] aparam/func_defaults Function Defaults//String/readwrite/False//False/
# \~English Mapping from argname to default value. Should match only the last part of the argnames list.
# Values are interpreted as Python code literals and that means string values need to be quoted.
Expand Down Expand Up @@ -193,15 +205,11 @@ class PyFuncApp(BarrierAppDROP):
)

func_name = dlg_string_param("func_name", None)

# func_code = dlg_bytes_param("func_code", None) # bytes or base64 string

pickle = dlg_bool_param("pickle", True)

input_parser: DropParser = dlg_enum_param(DropParser, "input_parser", DropParser.PICKLE) # type: ignore
output_parser: DropParser = dlg_enum_param(DropParser, "output_parser", DropParser.PICKLE) # type: ignore
func_arg_mapping = dlg_dict_param("func_arg_mapping", {})

func_defaults = dlg_dict_param("func_defaults", {})

f: Callable
fdefaults: dict

Expand Down Expand Up @@ -235,7 +243,7 @@ def _init_func_defaults(self):
+ "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}"
)
raise ValueError
if self.pickle:
if DropParser(self.input_parser) is DropParser.PICKLE:
# only values are pickled, get them unpickled
for name, value in self.func_defaults.items():
self.func_defaults[name] = deserialize_data(value)
Expand Down Expand Up @@ -284,7 +292,8 @@ def initialize(self, **kwargs):
"func_code",
"func_name",
"func_arg_mapping",
"pickle",
"input_parser",
"output_parser",
"func_defaults"
]
for kw in self.func_def_keywords:
Expand Down Expand Up @@ -363,7 +372,7 @@ def run(self):
Function arguments in Python can be passed as positional, kw-value, positional
only, kw-value only, and catch-all args and kwargs, which don't provide any
hint about the names of accepted parameters. All of them are now supported. If
positional arguments or kw-value arguments are provided by the user, but are
positional arguments or kw-value arguments are provided by the user, but are
not explicitely defined in the function signiture AND args and/or kwargs are
allowed then these arguments are passed to the function. For args this is
somewhat risky, since the order is relevant and in this code derived from the
Expand All @@ -381,12 +390,16 @@ def run(self):

# Inputs are un-pickled and treated as the arguments of the function
# Their order must be preserved, so we use an OrderedDict
if self.pickle:
if DropParser(self.input_parser) is DropParser.PICKLE:
all_contents = lambda x: pickle.loads(x)
elif DropParser(self.input_parser) is DropParser.EVAL:
all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8'))
elif DropParser(self.input_parser) is DropParser.PATH:
all_contents = lambda x: x.path
elif DropParser(self.input_parser) is DropParser.DATAURL:
all_contents = lambda x: x.dataurl
else:
all_contents = lambda x: ast.literal_eval(
droputils.allDropContents(x).decode("utf-8")
)
raise ValueError(self.input_parser.__repr__())

inputs = collections.OrderedDict()
for uid, drop in self._inputs.items():
Expand Down Expand Up @@ -455,6 +468,11 @@ def run(self):
if ptype in ["Complex", "Json"]:
try:
value = ast.literal_eval(value)
except ValueError:
pass
elif ptype in ["Python"]:
try:
value = eval(value)
except:
pass
pargsDict.update({
Expand Down Expand Up @@ -536,12 +554,13 @@ def write_results(self, result):
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
p = pickle.dumps(r)
if self.pickle:
if DropParser(self.output_parser) is DropParser.PICKLE:
logger.debug(f"Writing pickeled result {type(r)} to {o}")
o.write(pickle.dumps(r)) # @UndefinedVariable
o.write(pickle.dumps(r))
elif DropParser(self.output_parser) is DropParser.EVAL:
o.write(repr(r).encode('utf-8'))
else:
o.write(repr(r).encode("utf-8"))
ValueError(self.output_parser.__repr__())

def generate_recompute_data(self):
return self._recompute_data
9 changes: 6 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
dlg_int_param,
dlg_list_param,
dlg_string_param,
dlg_enum_param,
dlg_bool_param,
dlg_dict_param,
)
Expand Down Expand Up @@ -409,6 +410,10 @@ def getmembers(object, predicate=None):
value = kwargs.get(attr_name, obj.default_value)
if value is not None and value != "":
value = str(value)
elif isinstance(obj, dlg_enum_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None and value != "":
value = obj.cls(value)
elif isinstance(obj, dlg_list_param):
value = kwargs.get(attr_name, obj.default_value)
if isinstance(value, str):
Expand All @@ -418,9 +423,7 @@ def getmembers(object, predicate=None):
value = ast.literal_eval(value)
if value is not None and not isinstance(value, list):
raise Exception(
"dlg_list_param {} is not a list. It is a {}".format(
attr_name, type(value)
)
f"dlg_list_param {attr_name} is not a list. Type is {type(value)}"
)
elif isinstance(obj, dlg_dict_param):
value = kwargs.get(attr_name, obj.default_value)
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
dlg_string_param = collections.namedtuple(
"dlg_string_param", "description default_value"
)
dlg_enum_param = collections.namedtuple("dlg_enum_param", "cls description default_value")
dlg_list_param = collections.namedtuple("dlg_list_param", "description default_value")
dlg_dict_param = collections.namedtuple("dlg_dict_param", "description default_value")

Expand Down
24 changes: 19 additions & 5 deletions daliuge-engine/docker/Dockerfile.devall
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,24 @@ ENV DLG_ROOT="/tmp/dlg"

RUN apt install -y git python3-dev

RUN pip install git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git
RUN pip install dlg-nifty-components

RUN pip install --extra-index-url=https://artefact.skao.int/repository/pypi-internal/simple 'ska-sdp-cbf-emulator[plasma]==1.6.11'
RUN pip install dlg-casacore-components
# Daliuge Modules
ENV PYTHON_PREFIX=/home/callan/dlg/code
ENV PATH=$PATH:$PYTHON_PREFIX/bin

# Nifty
#RUN pip install --prefix=$PYTHON_PREFIX git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git
#RUN pip install --prefix=$PYTHON_PREFIX dlg-nifty-components

# Casacore + SDP
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-dal-schemas
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-core[plasma]
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-modules[plasma]
#RUN pip install dlg-casacore-components

# RASCIL
# RUN mkdir -p /tmp/rascil_data && cd /tmp/rascil_data &&\
# curl https://ska-telescope.gitlab.io/external/rascil/rascil_data.tgz -o rascil_data.tgz
# RUN tar zxf rascil_data.tgz -C /dlg/lib/python3.8/site-packages
# RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple rascil

CMD ["dlg", "daemon", "-vv"]
5 changes: 3 additions & 2 deletions daliuge-engine/test/apps/test_pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def _PyFuncApp(oid, uid, f, **kwargs):
func_name=fname,
func_code=fcode,
func_defaults=fdefaults,
pickle=True,
**kwargs,
input_parser=pyfunc.DropParser.PICKLE,
output_parser=pyfunc.DropParser.PICKLE,
**kwargs
)


Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_sleepapp(self):
b.addInput(a)
b.addOutput(c)

a = NullDROP("a", "a")
self._test_graph_runs((a, b, c), a, c)

def _test_copyapp_simple(self, app):

Expand Down
37 changes: 30 additions & 7 deletions daliuge-engine/test/graphs/funcTestPG_namedPorts.graph
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,38 @@
"options": [],
"positional": false
},
"pickle": {
"text": "Pickle",
"value": false,
"defaultValue": "",
"description": "Whether the python arguments are pickled.",
"input_parser": {
"text": "Input Parser",
"value": "eval",
"defaultValue": "pickle",
"description": "Input port parsing technique",
"readonly": false,
"type": "Boolean",
"type": "Select",
"precious": false,
"options": [],
"options": [
"pickle",
"eval",
"path",
"dataurl",
"npy"
],
"positional": false
},
"output_parser": {
"text": "Output Parser",
"value": "eval",
"defaultValue": "pickle",
"description": "Output port parsing technique",
"readonly": false,
"type": "Select",
"precious": false,
"options": [
"pickle",
"eval",
"path",
"dataurl",
"npy"
],
"positional": false
},
"func_defaults": {
Expand Down
37 changes: 30 additions & 7 deletions daliuge-engine/test/graphs/pyfunc_glob_testPG.graph
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,38 @@
"options": [],
"positional": false
},
"pickle": {
"text": "Pickle",
"value": false,
"defaultValue": "",
"description": "Whether the python arguments are pickled.",
"input_parser": {
"text": "Input Parser",
"value": "eval",
"defaultValue": "pickle",
"description": "Input port parsing technique",
"readonly": false,
"type": "Boolean",
"type": "Select",
"precious": false,
"options": [],
"options": [
"pickle",
"eval",
"path",
"dataurl",
"npy"
],
"positional": false
},
"output_parser": {
"text": "Output Parser",
"value": "eval",
"defaultValue": "pickle",
"description": "Output port parsing technique",
"readonly": false,
"type": "Select",
"precious": false,
"options": [
"pickle",
"eval",
"path",
"dataurl",
"npy"
],
"positional": false
},
"func_defaults": {
Expand Down

0 comments on commit 20eae16

Please sign in to comment.