Skip to content

Commit

Permalink
New PyFuncApp
Browse files Browse the repository at this point in the history
This is described in LIU-10. The new PyFuncApp is basically a wrapper
around python functions. Unit tests check that the basic functionality
works locally, and across NodeManagers (i.e., through the RPC
mechanism).

Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Nov 1, 2017
1 parent abeeb53 commit 4e5861c
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 0 deletions.
84 changes: 84 additions & 0 deletions dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2017
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
"""Module implementing the PyFuncApp class"""

import importlib

import six.moves.cPickle as pickle # @UnresolvedImport

from .. import droputils
from ..drop import BarrierAppDROP
from ..exceptions import InvalidDropException


class PyFuncApp(BarrierAppDROP):
"""
An application that wraps a simple python function.
The inputs of the application are treated as the arguments of the function.
Conversely, the output of the function is treated as the output of the
application. If the application has more than one output, the result of
calling the function is treated as an iterable, with each individual object
being written to its corresponding output.
Users indicate the function to be wrapped via the ``function`` parameter,
which is of course mandatory.
Both inputs and outputs are serialized using the pickle protocol.
"""

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, **kwargs)

fname = self._getArg(kwargs, 'function', None)
if not fname:
raise InvalidDropException(self, 'No function specified')

# The name has the form pack1.pack2.mod.func
parts = fname.split('.')
if len(parts) < 2:
msg = '%s does not contain a module name' % fname
raise InvalidDropException(self, msg)

modname, fname = '.'.join(parts[:-1]), parts[-1]
try:
mod = importlib.import_module(modname, __name__)
self.f = getattr(mod, fname)
except ImportError as e:
raise InvalidDropException(self, 'Error when loading module %s: %s' % (modname, str(e)))
except AttributeError:
raise InvalidDropException(self, 'Module %s has no member %s' % (modname, fname))

def run(self):

# Inputs are un-pickled and treated as the arguments of the function
args = map(lambda x: pickle.loads(droputils.allDropContents(x)), self.inputs) # @UndefinedVariable
result = self.f(*args)

# Depending on how many outputs we have we treat our result
# as an iterable or as a single object. Each result is pickled
# and written to its corresponding output
outputs = self.outputs
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
o.write(pickle.dumps(r)) # @UndefinedVariable
159 changes: 159 additions & 0 deletions test/apps/test_pyfunc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2017
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import os
import random
import unittest

import six.moves.cPickle as pickle # @UnresolvedImport

from ..manager import test_dm
from dlg import droputils
from dlg.apps.pyfunc import PyFuncApp
from dlg.ddap_protocol import DROPStates, DROPRel, DROPLinkType
from dlg.drop import InMemoryDROP
from dlg.droputils import DROPWaiterCtx
from dlg.exceptions import InvalidDropException


def func1(arg1):
return arg1

def func2(arg1):
return arg1 * 2;

def func3():
return ['b', 'c', 'd']

def _PyFuncApp(oid, uid, f):
return PyFuncApp(oid, uid, function=__name__ + '.' + f)

class TestPyFuncApp(unittest.TestCase):

def test_missing_function_param(self):
self.assertRaises(InvalidDropException, PyFuncApp, 'a', 'a')

def test_invalid_function_param(self):
# The function doesn't have a module
self.assertRaises(InvalidDropException, PyFuncApp, 'a', 'a', function='func1')

def test_function_invalid_module(self):
# The function lives in an unknown module/package
self.assertRaises(InvalidDropException, PyFuncApp, 'a', 'a', function='doesnt_exist.func1')

def test_function_invalid_fname(self):
# The function lives in an unknown module/package
self.assertRaises(InvalidDropException, PyFuncApp, 'a', 'a', function='test.apps.test_pyfunc.doesnt_exist')

def test_valid_creation(self):
_PyFuncApp('a', 'a', 'func1')

def _test_func1_and_2(self, f, input_data, output_data):

a, c = [InMemoryDROP(x, x) for x in ('a', 'c')]
b = _PyFuncApp('b', 'b', f)
b.addInput(a)
b.addOutput(c)

with DROPWaiterCtx(self, c, 5):
a.write(pickle.dumps(input_data)) # @UndefinedVariable
a.setCompleted()

for drop in a, b, c:
self.assertEqual(DROPStates.COMPLETED, drop.status)
self.assertEqual(output_data, pickle.loads(droputils.allDropContents(c))) # @UndefinedVariable

def test_func1(self):
"""Checks that func1 in this module works when wrapped"""
data = os.urandom(64)
self._test_func1_and_2('func1', data, data)

def test_func2(self):
"""Checks that func2 in this module works when wrapped"""
n = random.randint(0, 1e6)
self._test_func1_and_2('func2', n, 2 * n)

def _test_func3(self, output_drops, expected_outputs):

a = _PyFuncApp('a', 'a', 'func3')
for drop in output_drops:
a.addOutput(drop)

drops = [a] + droputils.listify(output_drops)
a.execute()

for drop in drops:
self.assertEqual(DROPStates.COMPLETED, drop.status)

for expected_output, drop in zip(expected_outputs, output_drops):
self.assertEqual(expected_output, pickle.loads(droputils.allDropContents(drop))) # @UndefinedVariable

def test_func3_singleoutput(self):
"""
Checks that func3 in this module works when wrapped as an application
with multiple outputs.
"""
self._test_func3([InMemoryDROP('b', 'b')], [['b', 'c', 'd']])

def test_func3_multioutput(self):
"""
Checks that func3 in this module works when wrapped as an application
with multiple outputs.
"""
output_drops = [InMemoryDROP(x, x) for x in ('b', 'c', 'd')]
self._test_func3(output_drops, ('b', 'c', 'd'))

class PyFuncAppIntraNMTest(test_dm.NMTestsMixIn, unittest.TestCase):

def test_input_in_remote_nm(self):
"""
A test similar in spirit to TestDM.test_runGraphOneDOPerDom, but where
application B is a PyFuncApp. This makes sure that PyFuncApp work fine
across Node Managers.
NM #1 NM #2
======= =============
| A --|----|-> B --> C |
======= =============
"""
g1 = [{"oid":"A", "type":"plain", "storage": "memory"}]
g2 = [{"oid":"B", "type":"app", "app":"dfms.apps.pyfunc.PyFuncApp", "function": __name__ + '.func1'},
{"oid":"C", "type":"plain", "storage": "memory", "producers":["B"]}]
rels = [DROPRel('A', DROPLinkType.INPUT, 'B')]
a_data = pickle.dumps(os.urandom(32))
self._test_runGraphInTwoNMs(g1, g2, rels, a_data, a_data)

def test_output_in_remote_nm(self):
"""
Like the above, but with this graph. In this case the output (instead of
the input) is in a remote Node Manager.
NM #1 NM #2
============= =======
| A --> B --|----|-> C |
============= =======
"""
g1 = [{"oid":"A", "type":"plain", "storage": "memory", "consumers": ['B']},
{"oid":"B", "type":"app", "app":"dfms.apps.pyfunc.PyFuncApp", "function": __name__ + '.func1'}]
g2 = [{"oid":"C", "type":"plain", "storage": "memory"}]
rels = [DROPRel('B', DROPLinkType.PRODUCER, 'C')]
a_data = pickle.dumps(os.urandom(32))
self._test_runGraphInTwoNMs(g1, g2, rels, a_data, a_data)

0 comments on commit 4e5861c

Please sign in to comment.