Skip to content

Commit

Permalink
Merge c5692b5 into a47cb17
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed May 25, 2022
2 parents a47cb17 + c5692b5 commit 00219a9
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 66 deletions.
39 changes: 22 additions & 17 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import logging
import pickle

from typing import Callable
from typing import Callable, Optional
import dill
from io import StringIO
from contextlib import redirect_stdout
Expand Down Expand Up @@ -116,10 +116,10 @@ def import_using_code(code):
class DropParser(Enum):
PICKLE = 'pickle'
EVAL = 'eval'
PATH = 'path'
DATAURL = 'dataurl'
NPY = 'npy'
#JSON = "json"
PATH = 'path' # input only
DATAURL = 'dataurl' # input only

##
# @brief PyFuncApp
Expand Down Expand Up @@ -148,9 +148,9 @@ class DropParser(Enum):
# \~English Python function name
# @param[in] aparam/func_code Function Code//String/readwrite/False//False/
# \~English Python function code, e.g. 'def function_name(args): return args'
# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,npy,eval,path,dataurl/False/
# \~English Input port parsing technique
# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/
# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,npy/False/
# \~English output port parsing technique
# @param[in] aparam/func_defaults Function Defaults//String/readwrite/False//False/
# \~English Mapping from argname to default value. Should match only the last part of the argnames list.
Expand Down Expand Up @@ -243,7 +243,7 @@ def _init_func_defaults(self):
+ "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}"
)
raise ValueError
if DropParser(self.input_parser) is DropParser.PICKLE:
if self.input_parser is DropParser.PICKLE:
# only values are pickled, get them unpickled
for name, value in self.func_defaults.items():
self.func_defaults[name] = deserialize_data(value)
Expand Down Expand Up @@ -382,18 +382,21 @@ def run(self):

# Inputs are un-pickled and treated as the arguments of the function
# Their order must be preserved, so we use an OrderedDict
if DropParser(self.input_parser) is DropParser.PICKLE:
all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
elif DropParser(self.input_parser) is DropParser.EVAL:
def astparse(x):
if self.input_parser is DropParser.PICKLE:
#all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
all_contents = droputils.load_pickle
elif self.input_parser is DropParser.EVAL:
def optionalEval(x):
# Null and Empty Drops will return an empty byte string
# which should propogate back to None
content: bytes = droputils.allDropContents(x)
return ast.literal_eval(content.decode('utf-8')) if content else None
all_contents = astparse
elif DropParser(self.input_parser) is DropParser.PATH:
content: str = droputils.allDropContents(x).decode('utf-8')
return ast.literal_eval(content) if len(content) > 0 else None
all_contents = optionalEval
elif self.input_parser is DropParser.NPY:
all_contents = droputils.load_npy
elif self.input_parser is DropParser.PATH:
all_contents = lambda x: x.path
elif DropParser(self.input_parser) is DropParser.DATAURL:
elif self.input_parser is DropParser.DATAURL:
all_contents = lambda x: x.dataurl
else:
raise ValueError(self.input_parser.__repr__())
Expand Down Expand Up @@ -547,11 +550,13 @@ def write_results(self, result):
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
if DropParser(self.output_parser) is DropParser.PICKLE:
if self.output_parser is DropParser.PICKLE:
logger.debug(f"Writing pickeled result {type(r)} to {o}")
o.write(pickle.dumps(r))
elif DropParser(self.output_parser) is DropParser.EVAL:
elif self.output_parser is DropParser.EVAL:
o.write(repr(r).encode('utf-8'))
elif self.output_parser is DropParser.NPY:
droputils.save_npy(o, r)
else:
ValueError(self.output_parser.__repr__())

Expand Down
46 changes: 27 additions & 19 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __exit__(self, typ, value, tb):
)


def allDropContents(drop, bufsize=4096):
def allDropContents(drop, bufsize=4096) -> bytes:
"""
Returns all the data contained in a given DROP
"""
Expand Down Expand Up @@ -267,24 +267,24 @@ def listify(o):
return [o]


# def save_pickle(drop: DataDROP, data: Any):
# """Saves a python object in pkl format"""
# pickle.dump(data, drop)
def save_pickle(drop: DataDROP, data: Any):
"""Saves a python object in pkl format"""
pickle.dump(data, drop)


# def load_pickle(drop: DataDROP) -> Any:
# """Loads a pkl formatted data object stored in a DataDROP.
# Note: does not support streaming mode.
# """
# buf = io.BytesIO()
# desc = drop.open()
# while True:
# data = drop.read(desc)
# if not data:
# break
# buf.write(data)
# drop.close(desc)
# return pickle.loads(buf.getbuffer())
def load_pickle(drop: DataDROP) -> Any:
"""Loads a pkl formatted data object stored in a DataDROP.
Note: does not support streaming mode.
"""
buf = io.BytesIO()
desc = drop.open()
while True:
data = drop.read(desc)
if not data:
break
buf.write(data)
drop.close(desc)
return pickle.loads(buf.getbuffer())


# async def save_pickle_iter(drop: DataDROP, data: Iterable[Any]):
Expand All @@ -298,7 +298,7 @@ def listify(o):
# yield pickle.load(p)


def save_numpy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False):
def save_npy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False):
"""
Saves a numpy ndarray to a drop in npy format
"""
Expand All @@ -312,7 +312,11 @@ def save_numpy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False):
dropio.close()


def load_numpy(drop: DataDROP, allow_pickle=False) -> np.ndarray:
def save_numpy(drop: DataDROP, ndarray: np.ndarray):
save_npy(drop, ndarray)


def load_npy(drop: DataDROP, allow_pickle=False) -> np.ndarray:
"""
Loads a numpy ndarray from a drop in npy format
"""
Expand All @@ -323,6 +327,10 @@ def load_numpy(drop: DataDROP, allow_pickle=False) -> np.ndarray:
return res


def load_numpy(drop: DataDROP):
return load_npy(drop)


# def save_jsonp(drop: PathBasedDrop, data: Dict[str, object]):
# with open(drop.path, 'r') as f:
# json.dump(data, f)
Expand Down
71 changes: 66 additions & 5 deletions daliuge-engine/test/apps/test_pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import unittest
import pkg_resources
import json
import numpy

from ..manager import test_dm
from dlg import droputils, graph_loader
Expand Down Expand Up @@ -79,8 +80,6 @@ def _PyFuncApp(oid, uid, f, **kwargs):
func_name=fname,
func_code=fcode,
func_defaults=fdefaults,
input_parser=pyfunc.DropParser.PICKLE,
output_parser=pyfunc.DropParser.PICKLE,
**kwargs
)

Expand Down Expand Up @@ -124,22 +123,84 @@ def inner_function(x, y):

_PyFuncApp("a", "a", inner_function)

def _test_simple_functions(self, f, input_data, output_data):
def test_pickle_func(self, f = lambda x: x, input_data="hello", output_data="hello"):
a = InMemoryDROP("a", "a")
b = _PyFuncApp("b", "b", f)
c = InMemoryDROP("c", "c")

b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
droputils.save_pickle(a, input_data)
a.setCompleted()
for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
self.assertEqual(
output_data, droputils.load_pickle(c)
)

def test_eval_func(self, f = lambda x: x, input_data=None, output_data=None):
input_data = [2,2] if input_data is None else input_data
output_data = [2,2] if output_data is None else output_data

a = InMemoryDROP("a", "a")
b = _PyFuncApp("b", "b", f,
input_parser=pyfunc.DropParser.EVAL,
output_parser=pyfunc.DropParser.EVAL
)
c = InMemoryDROP("c", "c")

b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
a.write(repr(input_data).encode('utf-8'))
a.setCompleted()
for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
self.assertEqual(
output_data, eval(droputils.allDropContents(c).decode('utf-8'), {}, {})
)

def test_npy_func(self, f = lambda x: x, input_data=None, output_data=None):
input_data = numpy.ones([2,2]) if input_data is None else input_data
output_data = numpy.ones([2,2]) if output_data is None else output_data

a = InMemoryDROP("a", "a")
b = _PyFuncApp("b", "b", f,
input_parser=pyfunc.DropParser.NPY,
output_parser=pyfunc.DropParser.NPY
)
c = InMemoryDROP("c", "c")

b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
droputils.save_npy(a, input_data)
a.setCompleted()
for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
numpy.testing.assert_equal(
output_data, droputils.load_npy(c)
)

def _test_simple_functions(self, f, input_data, output_data):
a, c = [InMemoryDROP(x, x) for x in ("a", "c")]
b = _PyFuncApp("b", "b", f)
b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
a.write(pickle.dumps(input_data)) # @UndefinedVariable
a.write(pickle.dumps(input_data))
a.setCompleted()

for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
self.assertEqual(
output_data, pickle.loads(droputils.allDropContents(c))
) # @UndefinedVariable
)

def test_func1(self):
"""Checks that func1 in this module works when wrapped"""
Expand Down
1 change: 1 addition & 0 deletions docs/development/app_development/app_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ integration and testing during component development. As mentioned already, for
dynlib_components
docker_components
service_components
pyfunc_components
datadrop_io
wrap_existing
test_and_debug
Expand Down
41 changes: 27 additions & 14 deletions docs/development/app_development/datadrop_io.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _datadrop_io:

DataDROP I/O
============

Expand All @@ -24,15 +26,18 @@ Writing data into an output drop is similar but simpler. Application authors nee
one or more times the :attr:`write <dlg.drop.DataDROP.write>` method
with the data that needs to be written.

String Serialization
--------------------
Serialization
-------------

Many data drops are capable of storing data in different formats managed by the app drop.
Many data components are capable of storing data in multiple formats determined by the drop component. The common data io interface allows app components to be compatible with many data component types, however different app components connected to the same data component must use compatible serialization and deserialization types and utilities.

String Serialization
^^^^^^^^^^^^^^^^^^^^

Raw String
""""""""""

The simplest serialization format supported directly by `DataDrop.write` and `DataDrop.read`.
The simplest deserialization format supported directly by `DataDrop.write` and `DataDrop.read`.

JSON (.json)
""""""""""""
Expand All @@ -59,26 +64,34 @@ XML (.xml)
""""""""""

Markup format with similar features to YAML but with the addition of attributes. Serialization can be performed
using `dicttoxml` or both serialization and deserialiation using `xml.etree.ElementTree`.
using `dicttoxml` or both serialization and deserialization using `xml.etree.ElementTree`.


Python Eval (.py)
"""""""""""""""""

Python expressions and literals are valid string serialization formats whereby the string data is iterpreted as python code. Serialization is typically performed using the `__repr__` instance method and deserialization using `eval` or `ast.eval_literal`.

Binary Serialization
--------------------
^^^^^^^^^^^^^^^^^^^^

Data drops may also store binary formats that are typically more efficient than string formats
and may utilize the python buffer protocol.

Raw Bytes
"""""""""

Data drops can always be read as raw bytes using `droputils.allDropContents` and written to using `DataDROP.write`. Reading as a bytes object creates a readonly in-memory data copy that may not be as performant as other drop utilities.

Pickle (.pkl)
"""""""""""""

Default serialazation format. Use `save_pickle` for serialization to this format and
`allDropContents` or `load_pickle` for deserialization.

Default serialazation format capable of serializing any python object. Use `save_pickle` for serialization to this format and `load_pickle` for deserialization.

Numpy (.npy)
""""""""""""

Portable numpy serialization format. Use `save_numpy`
Portable numpy serialization format. Use `save_numpy` for serialization and `load_numpy` for deserialization.

Numpy Zipped (.npz)
"""""""""""""""""""
Expand All @@ -87,15 +100,15 @@ Portable zipped numpy serialization format. Consists of a .zip directory holding
files.

Table Serialization
-------------------
^^^^^^^^^^^^^^^^^^^

parquet (.parquet)
parquet (.parquet)
"""""""""""""""""""

Open source column-based relational data format from Apache.

Drop Specialized Serialization
------------------------------
Specialized Serialization
^^^^^^^^^^^^^^^^^^^^^^^^^

Data drops such as RDBMSDrop drops manage their own record format and are
interfaced using relational data objects such `dict`, `pyarrow.RecordBatch` or `pandas.DataFrame`.
Loading

0 comments on commit 00219a9

Please sign in to comment.