Skip to content

Commit

Permalink
Merge edcbbca into cc15b7d
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Jan 14, 2022
2 parents cc15b7d + edcbbca commit 1270529
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 18 deletions.
75 changes: 71 additions & 4 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"""Applications used as examples, for testing, or in simple situations"""
import pickle
import random
from typing import List
import urllib.error
import urllib.request

import time
import ast
import numpy as np

from dlg import droputils, utils
Expand All @@ -40,7 +42,7 @@
dlg_batch_output,
dlg_streaming_input
)

from dlg.exceptions import DaliugeException
from dlg.apps.pyfunc import serialize_data, deserialize_data


Expand Down Expand Up @@ -439,12 +441,14 @@ class GenericScatterApp(BarrierAppDROP):
[dlg_streaming_input("binary/*")],
)

# automatically populated by scatter node
num_of_copies: int = dlg_int_param("num_of_copies", 1)

def initialize(self, **kwargs):
super(GenericScatterApp, self).initialize(**kwargs)

def run(self):
# split it as many times as we have outputs
numSplit = len(self.outputs)
numSplit = self.num_of_copies
cont = droputils.allDropContents(self.inputs[0])
# if the data is of type string it is not pickled, but stored as a binary string.
try:
Expand All @@ -466,6 +470,70 @@ def run(self):
o.write(d) # average across inputs


##
# @brief GenericNpyScatterApp
# @details An APP that splits about any axis on any npy format data drop
# into as many parts as the app has outputs, provided that the initially converted numpy
# array has enough elements. The return will be a numpy array of arrays, where the first
# axis is of length len(outputs). The modulo remainder of the length of the original array and
# the number of outputs will be distributed across the first len(outputs)-1 elements of the
# resulting array.
# @par EAGLE_START
# @param category PythonApp
# @param[in] param/appclass Application Class/dlg.apps.simple.GenericNpyScatterApp/String/readonly/
# \~English Application class
# @param[in] param/scatter_axes Scatter Axes/String/readwrite
# \~English The axes to split input ndarrays on, e.g. [0,0,0], length must
# match the number of input ports
# @param[out] port/array Array/npy/
# \~English A numpy array of arrays
# @par EAGLE_END
class GenericNpyScatterApp(BarrierAppDROP):
"""
An APP that splits an object that has a len attribute into <num_of_copies> parts and
returns a numpy array of arrays.
"""

component_meta = dlg_component(
"GenericNpyScatterApp",
"Scatter an array like object into <num_of_copies> parts",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
)

# automatically populated by scatter node
num_of_copies: int = dlg_int_param("num_of_copies", 1)
scatter_axes: List[int] = dlg_string_param("scatter_axes", "[0]")

def initialize(self, **kwargs):
super(GenericNpyScatterApp, self).initialize(**kwargs)
self.scatter_axes = ast.literal_eval(self.scatter_axes)

def run(self):
if len(self.inputs) * self.num_of_copies != len(self.outputs):
raise DaliugeException(\
f"expected {len(self.inputs) * self.num_of_copies} outputs,\
got {len(self.outputs)}")
if len(self.inputs) != len(self.scatter_axes):
raise DaliugeException(\
f"expected {len(self.inputs)} axes,\
got {len(self.scatter_axes)}")

# split it as many times as we have outputs
self.num_of_copies = self.num_of_copies

for in_index in range(len(self.inputs)):
nObj = droputils.load_numpy(self.inputs[in_index])
try:
result = np.array_split(nObj, self.num_of_copies, axis=self.scatter_axes[in_index])
except IndexError as err:
raise err
for split_index in range(self.num_of_copies):
out_index = in_index * self.num_of_copies + split_index
droputils.save_numpy(self.outputs[out_index], result[split_index])


class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

Expand All @@ -480,7 +548,6 @@ def condition(self):
return self.result



##
# @brief ListAppendThrashingApp\n
# @details A testing APP that appends a random integer to a list num times.
Expand Down
5 changes: 3 additions & 2 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,16 @@ def listify(o):

def save_numpy(drop, ndarray: np.ndarray, allow_pickle=False):
"""
Saves a numpy ndarray to a drop
Saves a numpy ndarray to a drop in npy format
"""
bio = io.BytesIO()
np.save(bio, ndarray, allow_pickle=allow_pickle)
drop.write(bio.getbuffer())


def load_numpy(drop, allow_pickle=False) -> np.ndarray:
"""
Loads a numpy ndarray from a drop
Loads a numpy ndarray from a drop in npy format
"""
dropio = drop.getIO()
dropio.open(OpenMode.OPEN_READ)
Expand Down
65 changes: 57 additions & 8 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@
import time
import unittest
from multiprocessing.pool import ThreadPool
from numpy import random, mean, array, concatenate

from numpy import random, mean, array, concatenate, random, testing
from psutil import cpu_count

from dlg import droputils
from dlg.apps.simple import GenericScatterApp, SleepApp, CopyApp, SleepAndCopyApp, \
from dlg.apps.simple import (
GenericScatterApp,
GenericNpyScatterApp,
SleepApp,
CopyApp,
SleepAndCopyApp,
ListAppendThrashingApp
)
from dlg.apps.simple import RandomArrayApp, AverageArraysApp, HelloWorldApp
from dlg.ddap_protocol import DROPStates
from dlg.drop import NullDROP, InMemoryDROP, FileDROP, NgasDROP

if sys.version_info >= (3, 8):
from dlg.manager.shared_memory_manager import DlgSharedMemoryManager
from numpy import random, mean, array, concatenate
from psutil import cpu_count

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -146,7 +149,7 @@ def test_helloworldapp(self):

def test_parallelHelloWorld(self):
m0 = InMemoryDROP("m0", "m0")
s = GenericScatterApp("s", "s")
s = GenericScatterApp("s", "s", num_of_copies=4)
greets = ["World", "Solar system", "Galaxy", "Universe"]
m0.write(pickle.dumps(greets))
s.addInput(m0)
Expand Down Expand Up @@ -194,7 +197,7 @@ def test_genericScatter(self):
data_in = random.randint(0, 100, size=100)
b = InMemoryDROP("b", "b")
b.write(pickle.dumps(data_in))
s = GenericScatterApp("s", "s")
s = GenericScatterApp("s", "s", num_of_copies=2)
s.addInput(b)
o1 = InMemoryDROP("o1", "o1")
o2 = InMemoryDROP("o2", "o2")
Expand All @@ -207,6 +210,52 @@ def test_genericScatter(self):
data_out = concatenate([data1, data2])
self.assertEqual(data_in.all(), data_out.all())

def test_genericNpyScatter(self):
data_in = random.rand(100, 100)
b = InMemoryDROP("b", "b")
droputils.save_numpy(b, data_in)
s = GenericNpyScatterApp("s", "s", num_of_copies=2)
s.addInput(b)
o1 = InMemoryDROP("o1", "o1")
o2 = InMemoryDROP("o2", "o2")
for x in o1, o2:
s.addOutput(x)
self._test_graph_runs((b, s, o1, o2), b, (o1, o2), timeout=4)

data1 = droputils.load_numpy(o1)
data2 = droputils.load_numpy(o2)
data_out = concatenate([data1, data2])
self.assertEqual(data_in.all(), data_out.all())

def test_genericNpyScatter_multi(self):
data1_in = random.rand(100, 100)
data2_in = random.rand(100, 100)
b = InMemoryDROP("b", "b")
c = InMemoryDROP("c", "c")
droputils.save_numpy(b, data1_in)
droputils.save_numpy(c, data2_in)
s = GenericNpyScatterApp("s", "s", num_of_copies=2, scatter_axes="[0,0]")
s.addInput(b)
s.addInput(c)
o1 = InMemoryDROP("o1", "o1")
o2 = InMemoryDROP("o2", "o2")
o3 = InMemoryDROP("o3", "o3")
o4 = InMemoryDROP("o4", "o4")
for x in o1, o2, o3, o4:
s.addOutput(x)
self._test_graph_runs((b, s, o1, o2, o3, o4), (b, c), (o1, o2, o3, o4), timeout=4)

data11 = droputils.load_numpy(o1)
data12 = droputils.load_numpy(o2)
data1_out = concatenate([data11, data12])
self.assertEqual(data1_out.shape, data1_in.shape)
testing.assert_array_equal(data1_out, data1_in)

data21 = droputils.load_numpy(o3)
data22 = droputils.load_numpy(o4)
data2_out = concatenate([data21, data22])
testing.assert_array_equal(data2_out, data2_in)

def test_listappendthrashing(self, size=1000):
a = InMemoryDROP('a', 'a')
b = ListAppendThrashingApp('b', 'b', size=size)
Expand Down
12 changes: 8 additions & 4 deletions daliuge-translator/dlg/dropmake/dm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,15 @@ def convert_construct(lgo):
if "group" in node:
app_node["group"] = node["group"]

for app_fd_name in ["appFields", "inputAppFields"]:
if app_fd_name in node:
for afd in node[app_fd_name]:
INPUT_APP_FIELDS = "inputAppFields"
if INPUT_APP_FIELDS in node:
# inputAppFields are converted to fields to be processed like
# regular application drops
app_node["fields"] = list(node[INPUT_APP_FIELDS])
app_node["fields"] += node["fields"]
# TODO: remove, use fields list
for afd in node[INPUT_APP_FIELDS]:
app_node[afd["name"]] = afd["value"]
break

if node["category"] == Categories.GATHER:
app_node["group_start"] = 1
Expand Down

0 comments on commit 1270529

Please sign in to comment.