Skip to content

Commit

Permalink
Merge b9f5eac into 8635f9e
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed May 26, 2022
2 parents 8635f9e + b9f5eac commit 46e0968
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 67 deletions.
39 changes: 22 additions & 17 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import logging
import pickle

from typing import Callable
from typing import Callable, Optional
import dill
from io import StringIO
from contextlib import redirect_stdout
Expand Down Expand Up @@ -116,10 +116,10 @@ def import_using_code(code):
class DropParser(Enum):
PICKLE = 'pickle'
EVAL = 'eval'
PATH = 'path'
DATAURL = 'dataurl'
NPY = 'npy'
#JSON = "json"
PATH = 'path' # input only
DATAURL = 'dataurl' # input only

##
# @brief PyFuncApp
Expand Down Expand Up @@ -148,9 +148,9 @@ class DropParser(Enum):
# \~English Python function name
# @param[in] aparam/func_code Function Code//String/readwrite/False//False/
# \~English Python function code, e.g. 'def function_name(args): return args'
# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,npy,eval,path,dataurl/False/
# \~English Input port parsing technique
# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,npy/False/
# \~English output port parsing technique
# @param[in] aparam/func_defaults Function Defaults//String/readwrite/False//False/
# \~English Mapping from argname to default value. Should match only the last part of the argnames list.
Expand Down Expand Up @@ -243,7 +243,7 @@ def _init_func_defaults(self):
+ "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}"
)
raise ValueError
if DropParser(self.input_parser) is DropParser.PICKLE:
if self.input_parser is DropParser.PICKLE:
# only values are pickled, get them unpickled
for name, value in self.func_defaults.items():
self.func_defaults[name] = deserialize_data(value)
Expand Down Expand Up @@ -382,18 +382,21 @@ def run(self):

# Inputs are un-pickled and treated as the arguments of the function
# Their order must be preserved, so we use an OrderedDict
if DropParser(self.input_parser) is DropParser.PICKLE:
all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
elif DropParser(self.input_parser) is DropParser.EVAL:
def astparse(x):
if self.input_parser is DropParser.PICKLE:
#all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
all_contents = droputils.load_pickle
elif self.input_parser is DropParser.EVAL:
def optionalEval(x):
# Null and Empty Drops will return an empty byte string
# which should propogate back to None
content: bytes = droputils.allDropContents(x)
return ast.literal_eval(content.decode('utf-8')) if content else None
all_contents = astparse
elif DropParser(self.input_parser) is DropParser.PATH:
content: str = droputils.allDropContents(x).decode('utf-8')
return ast.literal_eval(content) if len(content) > 0 else None
all_contents = optionalEval
elif self.input_parser is DropParser.NPY:
all_contents = droputils.load_npy
elif self.input_parser is DropParser.PATH:
all_contents = lambda x: x.path
elif DropParser(self.input_parser) is DropParser.DATAURL:
elif self.input_parser is DropParser.DATAURL:
all_contents = lambda x: x.dataurl
else:
raise ValueError(self.input_parser.__repr__())
Expand Down Expand Up @@ -576,11 +579,13 @@ def write_results(self, result):
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
if DropParser(self.output_parser) is DropParser.PICKLE:
if self.output_parser is DropParser.PICKLE:
logger.debug(f"Writing pickeled result {type(r)} to {o}")
o.write(pickle.dumps(r))
elif DropParser(self.output_parser) is DropParser.EVAL:
elif self.output_parser is DropParser.EVAL:
o.write(repr(r).encode('utf-8'))
elif self.output_parser is DropParser.NPY:
droputils.save_npy(o, r)
else:
ValueError(self.output_parser.__repr__())

Expand Down
46 changes: 27 additions & 19 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __exit__(self, typ, value, tb):
)


def allDropContents(drop, bufsize=4096):
def allDropContents(drop, bufsize=4096) -> bytes:
"""
Returns all the data contained in a given DROP
"""
Expand Down Expand Up @@ -267,24 +267,24 @@ def listify(o):
return [o]


# def save_pickle(drop: DataDROP, data: Any):
# """Saves a python object in pkl format"""
# pickle.dump(data, drop)
def save_pickle(drop: DataDROP, data: Any):
"""Saves a python object in pkl format"""
pickle.dump(data, drop)


# def load_pickle(drop: DataDROP) -> Any:
# """Loads a pkl formatted data object stored in a DataDROP.
# Note: does not support streaming mode.
# """
# buf = io.BytesIO()
# desc = drop.open()
# while True:
# data = drop.read(desc)
# if not data:
# break
# buf.write(data)
# drop.close(desc)
# return pickle.loads(buf.getbuffer())
def load_pickle(drop: DataDROP) -> Any:
"""Loads a pkl formatted data object stored in a DataDROP.
Note: does not support streaming mode.
"""
buf = io.BytesIO()
desc = drop.open()
while True:
data = drop.read(desc)
if not data:
break
buf.write(data)
drop.close(desc)
return pickle.loads(buf.getbuffer())


# async def save_pickle_iter(drop: DataDROP, data: Iterable[Any]):
Expand All @@ -298,7 +298,7 @@ def listify(o):
# yield pickle.load(p)


def save_numpy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False):
def save_npy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False):
"""
Saves a numpy ndarray to a drop in npy format
"""
Expand All @@ -312,7 +312,11 @@ def save_numpy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False):
dropio.close()


def load_numpy(drop: DataDROP, allow_pickle=False) -> np.ndarray:
def save_numpy(drop: DataDROP, ndarray: np.ndarray):
save_npy(drop, ndarray)


def load_npy(drop: DataDROP, allow_pickle=False) -> np.ndarray:
"""
Loads a numpy ndarray from a drop in npy format
"""
Expand All @@ -323,6 +327,10 @@ def load_numpy(drop: DataDROP, allow_pickle=False) -> np.ndarray:
return res


def load_numpy(drop: DataDROP):
return load_npy(drop)


# def save_jsonp(drop: PathBasedDrop, data: Dict[str, object]):
# with open(drop.path, 'r') as f:
# json.dump(data, f)
Expand Down
71 changes: 66 additions & 5 deletions daliuge-engine/test/apps/test_pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import unittest
import pkg_resources
import json
import numpy

from ..manager import test_dm
from dlg import droputils, graph_loader
Expand Down Expand Up @@ -79,8 +80,6 @@ def _PyFuncApp(oid, uid, f, **kwargs):
func_name=fname,
func_code=fcode,
func_defaults=fdefaults,
input_parser=pyfunc.DropParser.PICKLE,
output_parser=pyfunc.DropParser.PICKLE,
**kwargs
)

Expand Down Expand Up @@ -124,22 +123,84 @@ def inner_function(x, y):

_PyFuncApp("a", "a", inner_function)

def _test_simple_functions(self, f, input_data, output_data):
def test_pickle_func(self, f = lambda x: x, input_data="hello", output_data="hello"):
a = InMemoryDROP("a", "a")
b = _PyFuncApp("b", "b", f)
c = InMemoryDROP("c", "c")

b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
droputils.save_pickle(a, input_data)
a.setCompleted()
for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
self.assertEqual(
output_data, droputils.load_pickle(c)
)

def test_eval_func(self, f = lambda x: x, input_data=None, output_data=None):
input_data = [2,2] if input_data is None else input_data
output_data = [2,2] if output_data is None else output_data

a = InMemoryDROP("a", "a")
b = _PyFuncApp("b", "b", f,
input_parser=pyfunc.DropParser.EVAL,
output_parser=pyfunc.DropParser.EVAL
)
c = InMemoryDROP("c", "c")

b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
a.write(repr(input_data).encode('utf-8'))
a.setCompleted()
for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
self.assertEqual(
output_data, eval(droputils.allDropContents(c).decode('utf-8'), {}, {})
)

def test_npy_func(self, f = lambda x: x, input_data=None, output_data=None):
input_data = numpy.ones([2,2]) if input_data is None else input_data
output_data = numpy.ones([2,2]) if output_data is None else output_data

a = InMemoryDROP("a", "a")
b = _PyFuncApp("b", "b", f,
input_parser=pyfunc.DropParser.NPY,
output_parser=pyfunc.DropParser.NPY
)
c = InMemoryDROP("c", "c")

b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
droputils.save_npy(a, input_data)
a.setCompleted()
for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
numpy.testing.assert_equal(
output_data, droputils.load_npy(c)
)

def _test_simple_functions(self, f, input_data, output_data):
a, c = [InMemoryDROP(x, x) for x in ("a", "c")]
b = _PyFuncApp("b", "b", f)
b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
a.write(pickle.dumps(input_data)) # @UndefinedVariable
a.write(pickle.dumps(input_data))
a.setCompleted()

for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
self.assertEqual(
output_data, pickle.loads(droputils.allDropContents(c))
) # @UndefinedVariable
)

def test_func1(self):
"""Checks that func1 in this module works when wrapped"""
Expand Down
42 changes: 41 additions & 1 deletion daliuge-engine/test/test_droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
@author: rtobar
"""

import subprocess
import unittest

import numpy

from dlg import droputils
from dlg.common import dropdict, Categories
from dlg.drop import InMemoryDROP, FileDROP, BarrierAppDROP
from dlg.drop import InMemoryDROP, FileDROP, BarrierAppDROP, PlasmaDROP
from dlg.droputils import DROPFile


Expand Down Expand Up @@ -141,6 +144,43 @@ def testGetEndNodes(self):
endNodes = droputils.getLeafNodes(a)
self.assertSetEqual(set([j, f]), set(endNodes))

def _test_datadrop_function(self, test_function, input_data):
# basic datadrop
for drop_type in (InMemoryDROP,FileDROP):
test_function(drop_type, input_data)

#plasma datadrop
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "1000000", "-s", "/tmp/plasma"]
)
test_function(PlasmaDROP, input_data)
finally:
if store:
store.terminate()

def _test_save_load_pickle(self, drop_type, data):
drop = drop_type("a", "a")
droputils.save_pickle(drop, data)
drop.setCompleted()
output_data = droputils.load_pickle(drop)
self.assertEqual(data, output_data)

def test_save_load_pickle(self):
input_data = {'nested': {'data': {'object': {}}}}
self._test_datadrop_function(self._test_save_load_pickle, input_data)

def _test_save_load_npy(self, drop_type, data):
drop = drop_type("a", "a")
droputils.save_npy(drop, data)
output_data = droputils.load_npy(drop)
numpy.testing.assert_equal(data, output_data)

def test_save_load_npy(self):
input_data = numpy.ones([3,5])
self._test_datadrop_function(self._test_save_load_npy, input_data)

def test_DROPFile(self):
"""
This test exercises the DROPFile mechanism to read the data represented by
Expand Down
1 change: 1 addition & 0 deletions docs/development/app_development/app_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ integration and testing during component development. As mentioned already, for
dynlib_components
docker_components
service_components
pyfunc_components
datadrop_io
wrap_existing
test_and_debug
Expand Down
Loading

0 comments on commit 46e0968

Please sign in to comment.