Skip to content

Commit

Permalink
Merge pull request #86 from ICRAR/liu-189
Browse files Browse the repository at this point in the history
LIU 189 Expose PyFuncApp
  • Loading branch information
awicenec committed Dec 9, 2021
2 parents f36b4d2 + 9cf8add commit 7fa67ff
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 66 deletions.
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ExternalStoreApp(BarrierAppDROP):
where it resides.
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"ExternalStoreApp",
"An application that takes its input DROP (which must be one, and only one) "
"and creates a copy of it in a completely external store, from the point "
Expand Down Expand Up @@ -106,7 +106,7 @@ class NgasArchivingApp(ExternalStoreApp):
supported by the framework, and not only filesystem objects.
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"NgasArchivingApp",
"An ExternalStoreApp class that takes its input DROP and archives it in "
"an NGAS server. It currently deals with non-container DROPs only.",
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class BashShellApp(BashShellBase, BarrierAppDROP):
StreamingOutputBashApp for those cases.
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"BashShellApp",
"An app that runs a bash command in batch mode",
[dlg_batch_input("text/*", [])],
Expand All @@ -330,7 +330,7 @@ class StreamingOutputBashApp(BashShellBase, BarrierAppDROP):
next application.
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"StreamingOutputBashApp",
"Like BashShellApp, but its stdout is a stream "
"of data that is fed into the next application.",
Expand All @@ -357,7 +357,7 @@ class StreamingInputBashApp(StreamingInputBashAppBase):
this application off.
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"StreamingInputBashApp",
"An app that runs a bash command that consumes data from stdin.",
[dlg_batch_input("text/*", [])],
Expand All @@ -377,7 +377,7 @@ class StreamingInputOutputBashApp(StreamingInputBashAppBase):
fed into the next application.
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"StreamingInputOutputBashApp",
"Like StreamingInputBashApp, but its stdout is also a "
"stream of data that is fed into the next application.",
Expand Down
14 changes: 12 additions & 2 deletions daliuge-engine/dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CRCApp(BarrierAppDROP):
not something really intended to be used in a production system
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"CRCApp",
"A BarrierAppDROP that calculates the " "CRC of the single DROP it consumes",
[dlg_batch_input("binary/*", [])],
Expand Down Expand Up @@ -72,13 +72,23 @@ def run(self):
outputDrop.write(str(crc).encode("utf8"))


##
# @brief CRCStreamApp
# @details Calculate CRC in the streaming mode
# i.e. A "streamingConsumer" of its predecessor in the graph
# @par EAGLE_START
# @param category PythonApp
# @param[in] param/appclass Application Class/dlg.apps.crc.CRCStreamApp/String/readonly/
# \~English Application class
# @param[out] port/data Data/String/
# @par EAGLE_END
class CRCStreamApp(AppDROP):
"""
Calculate CRC in the streaming mode
i.e. A "streamingConsumer" of its predecessor in the graph
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"CRCStreamApp",
"Calculate CRC in the streaming mode.",
[dlg_batch_input("binary/*", [])],
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
# \~English MS output file
# @par EAGLE_END
class MSStreamingPlasmaConsumer(AppDROP):
compontent_meta = dlg_component(
component_meta = dlg_component(
"MSStreamingPlasmaConsumer",
"MS Plasma Consumer",
[dlg_batch_input("binary/*", [])],
Expand Down Expand Up @@ -145,7 +145,7 @@ def dropCompleted(self, uid, drop_state):
# \~English Plasma MS output
# @par EAGLE_END
class MSStreamingPlasmaProducer(BarrierAppDROP):
compontent_meta = dlg_component(
component_meta = dlg_component(
"MSStreamingPlasmaProducer",
"MS Plasma Producer",
[dlg_batch_input("binary/*", [])],
Expand Down
115 changes: 92 additions & 23 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,30 @@
#
"""Module implementing the PyFuncApp class"""

import ast
import base64
import collections
import importlib
import inspect
import logging
import pickle

from typing import Callable
import dill

from .. import droputils, utils
from ..drop import BarrierAppDROP
from ..exceptions import InvalidDropException

from dlg import droputils, utils
from dlg.drop import BarrierAppDROP
from dlg.exceptions import InvalidDropException
from dlg.meta import (
dlg_bool_param,
dlg_string_param,
dlg_float_param,
dlg_dict_param,
dlg_component,
dlg_batch_input,
dlg_batch_output,
dlg_streaming_input,
)

logger = logging.getLogger(__name__)

Expand All @@ -57,7 +68,7 @@ def serialize_func(f):
a = inspect.getfullargspec(f)
if a.defaults:
fdefaults = dict(
zip(a.args[-len(a.defaults) :], [serialize_data(d) for d in a.defaults])
zip(a.args[-len(a.defaults):], [serialize_data(d) for d in a.defaults])
)
logger.debug("Defaults for function %r: %r", f, fdefaults)
return fser, fdefaults
Expand Down Expand Up @@ -86,6 +97,30 @@ def import_using_code(code):
return dill.loads(code)


##
# @brief PyFuncApp
# @details An application that wraps a simple python function.
# The inputs of the application are treated as the arguments of the function.
# Conversely, the output of the function is treated as the output of the
# application. If the application has more than one output, the result of
# calling the function is treated as an iterable, with each individual object
# being written to its corresponding output.
# @par EAGLE_START
# @param category PythonApp
# @param[in] param/appclass Application Class/dlg.apps.pyfunc.PyFuncApp/String/readonly/
# \~English Application class
# @param[in] param/func_name Function Name//String/readwrite/
# \~English Python fuction name
# @param[in] param/func_code Function Code//String/readwrite/
# \~English Python fuction code, e.g. 'def fuction_name(args): return args'
# @param[in] param/pickle Pickle//bool/readwrite/
# \~English Whether the python arguments are pickled.
# @param[in] param/func_defaults Function Defaults//String/readwrite/
# \~English Mapping from argname to default value. Should match only the last part
# of the argnames list
# @param[in] param/func_arg_mapping Function Arguments Mapping//String/readwrite/
# \~English Mapping between argument name and input drop uids
# @par EAGLE_END
class PyFuncApp(BarrierAppDROP):
"""
An application that wraps a simple python function.
Expand All @@ -104,45 +139,76 @@ class PyFuncApp(BarrierAppDROP):
Both inputs and outputs are serialized using the pickle protocol.
"""

component_meta = dlg_component(
"PyFuncApp",
"Py Func App.",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
)

func_name = dlg_string_param("func_name", None)

# fcode = dlg_bytes_param("func_code", None) # bytes or base64 string

pickle = dlg_bool_param("pickle", True)

func_arg_mapping = dlg_dict_param("func_arg_mapping", {})

func_defaults = dlg_dict_param("func_defaults", {})

f: Callable
fdefaults: dict

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, **kwargs)

self.fname = fname = self._getArg(kwargs, "func_name", None)
fcode = self._getArg(kwargs, "func_code", None)
if not fname and not fcode:
self.fcode = self._getArg(kwargs, "func_code", None)
if not self.func_name and not self.fcode:
raise InvalidDropException(
self, "No function specified (either via name or code)"
)

if not fcode:
self.f = import_using_name(self, fname)
# Lookup function or import bytecode as a function
if not self.fcode:
self.f = import_using_name(self, self.func_name)
else:
if not isinstance(fcode, bytes):
fcode = base64.b64decode(fcode.encode("utf8"))
self.f = import_using_code(fcode)
if not isinstance(self.fcode, bytes):
self.fcode = base64.b64decode(self.fcode.encode("utf8"))
self.f = import_using_code(self.fcode)

# Mapping from argname to default value. Should match only the last part
# of the argnames list
fdefaults = self._getArg(kwargs, "func_defaults", {}) or {}
self.fdefaults = {name: deserialize_data(d) for name, d in fdefaults.items()}
logger.debug("Default values for function %s: %r", self.fname, self.fdefaults)
if isinstance(self.func_defaults, str):
self.func_defaults = ast.literal_eval(self.func_defaults)

if self.pickle:
self.fdefaults = {name: deserialize_data(d) for name, d in self.func_defaults.items()}
else:
self.fdefaults = self.func_defaults

logger.debug(f"Default values for function {self.func_name}: {self.fdefaults}")

# Mapping between argument name and input drop uids
self.func_arg_mapping = self._getArg(kwargs, "func_arg_mapping", {})
logger.debug("Input mapping: %r", self.func_arg_mapping)
logger.debug(f"Input mapping: {self.func_arg_mapping}")

def run(self):

# Inputs are un-pickled and treated as the arguments of the function
# Their order must be preserved, so we use an OrderedDict
all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
if self.pickle:
all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
else:
all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8'))

inputs = collections.OrderedDict()
for uid, i in self._inputs.items():
inputs[uid] = all_contents(i)
for uid, drop in self._inputs.items():
inputs[uid] = all_contents(drop)

# Keyword arguments are made up by the default values plus the inputs
# that match one of the keyword argument names
argnames = inspect.getfullargspec(self.f).args

kwargs = {
name: inputs.pop(uid)
for name, uid in self.func_arg_mapping.items()
Expand All @@ -152,7 +218,7 @@ def run(self):
# The rest of the inputs are the positional arguments
args = list(inputs.values())

logger.debug("Running %s with args=%r, kwargs=%r", self.fname, args, kwargs)
logger.debug(f"Running {self.func_name} with args={args}, kwargs={kwargs}")
result = self.f(*args, **kwargs)

# Depending on how many outputs we have we treat our result
Expand All @@ -162,4 +228,7 @@ def run(self):
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
o.write(pickle.dumps(r)) # @UndefinedVariable
if self.pickle:
o.write(pickle.dumps(r)) # @UndefinedVariable
else:
o.write(repr(r).encode('utf-8'))
30 changes: 25 additions & 5 deletions daliuge-engine/dlg/apps/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,44 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from ..remote import copyTo, copyFrom
from ..drop import (
from dlg.remote import copyTo, copyFrom
from dlg.drop import (
BarrierAppDROP,
NgasDROP,
InMemoryDROP,
NullDROP,
RDBMSDrop,
ContainerDROP,
)
from ..meta import (
from dlg.meta import (
dlg_string_param,
dlg_float_param,
dlg_component,
dlg_batch_input,
dlg_batch_output,
dlg_streaming_input,
)


##
# @brief ScpApp
# @details A BarrierAppDROP that copies the content of its single input onto its
# single output via SSH's scp protocol.
# @par EAGLE_START
# @param category PythonApp
# @param[in] param/appclass Application Class/dlg.apps.scp.ScpApp/String/readonly/
# \~English Application class
# @param[in] param/remoteUser Remote User//String/readwrite/
# \~English Remote user address
# @param[in] param/pkeyPath Private Key Path//String/readwrite/
# \~English Private key path
# @param[in] param/timeout Timeout//Float/readwrite/
# \~English Connection timeout in seconds
# @param[in] port/file File/PathBasedDrop/
# \~English Input file path
# @param[out] port/file File/PathBasedDrop/
# \~English Output file path
# @par EAGLE_END
class ScpApp(BarrierAppDROP):
"""
A BarrierAppDROP that copies the content of its single input onto its
Expand All @@ -52,7 +72,7 @@ class ScpApp(BarrierAppDROP):
two I/O DROPs.
"""

compontent_meta = dlg_component(
component_meta = dlg_component(
"ScpApp",
"A BarrierAppDROP that copies the content of its single "
"input onto its single output via SSHs scp protocol.",
Expand All @@ -71,7 +91,7 @@ class ScpApp(BarrierAppDROP):

remoteUser = dlg_string_param("remoteUser", None)
pkeyPath = dlg_string_param("pkeyPath", None)
timeout = dlg_string_param("timeout", None)
timeout = dlg_float_param("timeout", None)

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, **kwargs)
Expand Down
Loading

0 comments on commit 7fa67ff

Please sign in to comment.