Skip to content

Commit

Permalink
Merge 2f58663 into 943eb33
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed May 17, 2022
2 parents 943eb33 + 2f58663 commit a3b5881
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 39 deletions.
2 changes: 1 addition & 1 deletion daliuge-common/docker/Dockerfile.devcuda
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN DEBIAN_FRONTEND=noninteractive apt install -y wget gnupg2 software-propertie
RUN mkdir -p /code && cd /code &&\
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-ubuntu2004.pin &&\
mv cuda-ubuntu2004.pin /etc/apt/preferences.d/cuda-repository-pin-600 &&\
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/7fa2af80.pub &&\
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/3bf863cc.pub &&\
add-apt-repository "deb https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/ /" &&\
apt update

Expand Down
71 changes: 46 additions & 25 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ast
import base64
import collections
from enum import Enum
import importlib
import inspect
import logging
Expand All @@ -40,6 +41,7 @@
from dlg.meta import (
dlg_bool_param,
dlg_string_param,
dlg_enum_param,
dlg_float_param,
dlg_dict_param,
dlg_component,
Expand Down Expand Up @@ -104,13 +106,21 @@ def import_using_name(app, fname):
)
except AttributeError:
raise InvalidDropException(app, "Module %s has no member %s" % (modname, fname))



def import_using_code(code):
return dill.loads(code)


class DropParser(Enum):
PICKLE = 'pickle'
AST = 'ast'
PATH = 'path'
DATAURL = 'dataurl'
NPY = 'npy'
#JSON = "json"

##
# @brief PyFuncApp
# @details An application that wraps a simple python function.
Expand Down Expand Up @@ -138,8 +148,10 @@ def import_using_code(code):
# \~English Python function name
# @param[in] aparam/func_code Function Code//String/readwrite/False//False/
# \~English Python function code, e.g. 'def function_name(args): return args'
# @param[in] aparam/pickle Pickle/false/Boolean/readwrite/False//False/
# \~English Whether the python arguments are pickled.
# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,ast,path,dataurl,npy/False/
# \~English Input port parsing technique
# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,ast,path,dataurl,npy/False/
# \~English output port parsing technique
# @param[in] aparam/func_defaults Function Defaults//String/readwrite/False//False/
# \~English Mapping from argname to default value. Should match only the last part of the argnames list.
# Values are interpreted as Python code literals and that means string values need to be quoted.
Expand Down Expand Up @@ -193,15 +205,11 @@ class PyFuncApp(BarrierAppDROP):
)

func_name = dlg_string_param("func_name", None)

# func_code = dlg_bytes_param("func_code", None) # bytes or base64 string

pickle = dlg_bool_param("pickle", True)

input_parser: DropParser = dlg_enum_param(DropParser, "input_parser", DropParser.PICKLE) # type: ignore
output_parser: DropParser = dlg_enum_param(DropParser, "output_parser", DropParser.PICKLE) # type: ignore
func_arg_mapping = dlg_dict_param("func_arg_mapping", {})

func_defaults = dlg_dict_param("func_defaults", {})

f: Callable
fdefaults: dict

Expand Down Expand Up @@ -230,12 +238,10 @@ def _init_func_defaults(self):
self.func_defaults = self.func_defaults["kwargs"]
# we came all this way, now assume that any resulting dict is correct
if not isinstance(self.func_defaults, dict):
logger.error(
f"Wrong format or type for function defaults for "
+ "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}"
)
logger.error(f"Wrong format or type for function defaults for "+\
"{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}")
raise ValueError
if self.pickle:
if DropParser(self.input_parser) is DropParser.PICKLE:
# only values are pickled, get them unpickled
for name, value in self.func_defaults.items():
self.func_defaults[name] = deserialize_data(value)
Expand Down Expand Up @@ -275,6 +281,10 @@ def initialize(self, **kwargs):
"""
BarrierAppDROP.initialize(self, **kwargs)

# TODO: for some reason the literal is never cast?
self.input_parser = DropParser(self.input_parser)
self.output_parser = DropParser(self.output_parser)

self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})

self.func_code = self._getArg(kwargs, "func_code", None)
Expand All @@ -284,7 +294,8 @@ def initialize(self, **kwargs):
"func_code",
"func_name",
"func_arg_mapping",
"pickle",
"input_parser",
"output_parser",
"func_defaults"
]
for kw in self.func_def_keywords:
Expand Down Expand Up @@ -363,7 +374,7 @@ def run(self):
Function arguments in Python can be passed as positional, kw-value, positional
only, kw-value only, and catch-all args and kwargs, which don't provide any
hint about the names of accepted parameters. All of them are now supported. If
positional arguments or kw-value arguments are provided by the user, but are
positional arguments or kw-value arguments are provided by the user, but are
not explicitely defined in the function signiture AND args and/or kwargs are
allowed then these arguments are passed to the function. For args this is
somewhat risky, since the order is relevant and in this code derived from the
Expand All @@ -381,12 +392,16 @@ def run(self):

# Inputs are un-pickled and treated as the arguments of the function
# Their order must be preserved, so we use an OrderedDict
if self.pickle:
all_contents = lambda x: pickle.loads(x)
if DropParser(self.input_parser) is DropParser.PICKLE:
all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
elif DropParser(self.input_parser) is DropParser.AST:
all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8'))
elif DropParser(self.input_parser) is DropParser.PATH:
all_contents = lambda x: x.path
elif DropParser(self.input_parser) is DropParser.DATAURL:
all_contents = lambda x: x.dataurl
else:
all_contents = lambda x: ast.literal_eval(
droputils.allDropContents(x).decode("utf-8")
)
raise ValueError(self.input_parser.__repr__())

inputs = collections.OrderedDict()
for uid, drop in self._inputs.items():
Expand Down Expand Up @@ -455,6 +470,11 @@ def run(self):
if ptype in ["Complex", "Json"]:
try:
value = ast.literal_eval(value)
except ValueError:
pass
elif ptype in ["Python"]:
try:
value = eval(value)
except:
pass
pargsDict.update({
Expand Down Expand Up @@ -536,12 +556,13 @@ def write_results(self, result):
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
p = pickle.dumps(r)
if self.pickle:
if DropParser(self.output_parser) is DropParser.PICKLE:
logger.debug(f"Writing pickeled result {type(r)} to {o}")
o.write(pickle.dumps(r)) # @UndefinedVariable
o.write(pickle.dumps(r))
elif DropParser(self.output_parser) is DropParser.AST:
o.write(repr(r).encode('utf-8'))
else:
o.write(repr(r).encode("utf-8"))
ValueError(self.output_parser.__repr__())

def generate_recompute_data(self):
return self._recompute_data
9 changes: 6 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
dlg_int_param,
dlg_list_param,
dlg_string_param,
dlg_enum_param,
dlg_bool_param,
dlg_dict_param,
)
Expand Down Expand Up @@ -409,6 +410,10 @@ def getmembers(object, predicate=None):
value = kwargs.get(attr_name, obj.default_value)
if value is not None and value != "":
value = str(value)
elif isinstance(obj, dlg_enum_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None and value != "":
value = obj.cls(value)
elif isinstance(obj, dlg_list_param):
value = kwargs.get(attr_name, obj.default_value)
if isinstance(value, str):
Expand All @@ -418,9 +423,7 @@ def getmembers(object, predicate=None):
value = ast.literal_eval(value)
if value is not None and not isinstance(value, list):
raise Exception(
"dlg_list_param {} is not a list. It is a {}".format(
attr_name, type(value)
)
f"dlg_list_param {attr_name} is not a list. Type is {type(value)}"
)
elif isinstance(obj, dlg_dict_param):
value = kwargs.get(attr_name, obj.default_value)
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
dlg_string_param = collections.namedtuple(
"dlg_string_param", "description default_value"
)
dlg_enum_param = collections.namedtuple("dlg_enum_param", "cls description default_value")
dlg_list_param = collections.namedtuple("dlg_list_param", "description default_value")
dlg_dict_param = collections.namedtuple("dlg_dict_param", "description default_value")

Expand Down
24 changes: 19 additions & 5 deletions daliuge-engine/docker/Dockerfile.devall
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,24 @@ ENV DLG_ROOT="/tmp/dlg"

RUN apt install -y git python3-dev

RUN pip install git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git
RUN pip install dlg-nifty-components

RUN pip install --extra-index-url=https://artefact.skao.int/repository/pypi-internal/simple 'ska-sdp-cbf-emulator[plasma]==1.6.11'
RUN pip install dlg-casacore-components
# Daliuge Modules
ENV PYTHON_PREFIX=/home/callan/dlg/code
ENV PATH=$PATH:$PYTHON_PREFIX/bin

# Nifty
#RUN pip install --prefix=$PYTHON_PREFIX git+https://gitlab.com/ska-telescope/sdp/ska-gridder-nifty-cuda.git
#RUN pip install --prefix=$PYTHON_PREFIX dlg-nifty-components

# Casacore + SDP
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-dal-schemas
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-core[plasma]
#RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple --prefix=$PYTHON_PREFIX ska-sdp-realtime-receive-modules[plasma]
#RUN pip install dlg-casacore-components

# RASCIL
# RUN mkdir -p /tmp/rascil_data && cd /tmp/rascil_data &&\
# curl https://ska-telescope.gitlab.io/external/rascil/rascil_data.tgz -o rascil_data.tgz
# RUN tar zxf rascil_data.tgz -C /dlg/lib/python3.8/site-packages
# RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple rascil

CMD ["dlg", "daemon", "-vv"]
5 changes: 3 additions & 2 deletions daliuge-engine/test/apps/test_pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def _PyFuncApp(oid, uid, f, **kwargs):
func_name=fname,
func_code=fcode,
func_defaults=fdefaults,
pickle=True,
**kwargs,
input_parser=pyfunc.DropParser.PICKLE,
output_parser=pyfunc.DropParser.PICKLE,
**kwargs
)


Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_sleepapp(self):
b.addInput(a)
b.addOutput(c)

a = NullDROP("a", "a")
self._test_graph_runs((a, b, c), a, c)

def _test_copyapp_simple(self, app):

Expand Down
70 changes: 68 additions & 2 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#

import contextlib
from enum import Enum
import io
import os, unittest
import random
Expand Down Expand Up @@ -48,9 +49,18 @@
InputFiredAppDROP,
RDBMSDrop,
)
from dlg.meta import (
dlg_float_param,
dlg_string_param,
dlg_enum_param,
dlg_bool_param,
dlg_int_param,
dlg_list_param,
dlg_dict_param
)
from dlg.droputils import DROPWaiterCtx
from dlg.exceptions import InvalidDropException
from dlg.apps.simple import NullBarrierApp, SimpleBranch, SleepAndCopyApp
from dlg.apps.simple import NullBarrierApp, SimpleBranch, SleepAndCopyApp, SleepApp

try:
from crc32c import crc32c
Expand Down Expand Up @@ -92,7 +102,63 @@ def run(self):
outputDrop.write(str(crcSum).encode("utf8"))


class TestDROP(unittest.TestCase):
class TestAppDROP(unittest.TestCase):
def _test_graph_runs(self, drops, first, last, timeout=1):
first = droputils.listify(first)
with droputils.DROPWaiterCtx(self, last, timeout):
for f in first:
f.setCompleted()

for x in drops:
self.assertEqual(DROPStates.COMPLETED, x.status)

def test_app_param_defaults(self):
class MyEnum(Enum):
DEFAULT = "default"
ONE = "one"

class AssertAppDROP(BarrierAppDROP):
b: bool =dlg_bool_param("b", True) # type: ignore
i: int = dlg_int_param("i", 1) # type: ignore
f: float = dlg_float_param("f", 2.0) # type: ignore
s: str = dlg_string_param("s", "default") # type: ignore
e: MyEnum = dlg_enum_param(MyEnum, "e", "default") # type: ignore
l: list = dlg_list_param("l", []) # type: ignore
l2: list = dlg_list_param("l2", "[]") # type: ignore
d: dict = dlg_dict_param("d", {}) # type: ignore
d2: dict = dlg_dict_param("d2", "{}") # type: ignore

def run(self):
assert isinstance(self.b, bool)
assert self.b is True
assert isinstance(self.i, int)
assert self.i == 1
assert isinstance(self.f, float)
assert self.f == 2.0
assert isinstance(self.s, str)
assert self.s == "default"
assert isinstance(self.e, MyEnum)
assert self.e is MyEnum("default")
assert isinstance(self.l, list)
assert self.l == []
assert isinstance(self.l2, list)
assert self.l2 == []
assert isinstance(self.d, dict)
assert self.d == {}
assert isinstance(self.d2, dict)
assert self.d2 == {}

# Nothing fancy, just run it and be done with it
a = NullDROP("a", "a")
b = AssertAppDROP("b", "b")
c = NullDROP("c", "c")
b.addInput(a)
b.addOutput(c)

self._test_graph_runs((a, b, c), a, c)


class TestDataDROP(unittest.TestCase):
"""
DataDROP related unit tests
"""
Expand Down

0 comments on commit a3b5881

Please sign in to comment.