Skip to content

Commit

Permalink
Merge 0c1170f into cc15b7d
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray authored Jan 14, 2022
2 parents cc15b7d + 0c1170f commit e30e15c
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 12 deletions.
75 changes: 71 additions & 4 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"""Applications used as examples, for testing, or in simple situations"""
import pickle
import random
from typing import List
import urllib.error
import urllib.request

import time
import ast
import numpy as np

from dlg import droputils, utils
Expand All @@ -40,7 +42,7 @@
dlg_batch_output,
dlg_streaming_input
)

from dlg.exceptions import DaliugeException
from dlg.apps.pyfunc import serialize_data, deserialize_data


Expand Down Expand Up @@ -439,12 +441,14 @@ class GenericScatterApp(BarrierAppDROP):
[dlg_streaming_input("binary/*")],
)

# automatically populated by scatter node
num_of_copies: int = dlg_int_param("num_of_copies", 1)

def initialize(self, **kwargs):
super(GenericScatterApp, self).initialize(**kwargs)

def run(self):
# split it as many times as we have outputs
numSplit = len(self.outputs)
numSplit = self.num_of_copies
cont = droputils.allDropContents(self.inputs[0])
# if the data is of type string it is not pickled, but stored as a binary string.
try:
Expand All @@ -466,6 +470,70 @@ def run(self):
o.write(d) # average across inputs


##
# @brief GenericNpyScatterApp
# @details An APP that splits about any object that can be converted to a numpy array
# into as many parts as the app has outputs, provided that the initially converted numpy
# array has enough elements. The return will be a numpy array of arrays, where the first
# axis is of length len(outputs). The modulo remainder of the length of the original array and
# the number of outputs will be distributed across the first len(outputs)-1 elements of the
# resulting array.
# @par EAGLE_START
# @param category PythonApp
# @param[in] param/appclass Application Class/dlg.apps.simple.GenericNpyScatterApp/String/readonly/
# \~English Application class
# @param[in] param/scatter_axes Scatter Axes/String/readwrite
# \~English The axes to split input ndarrays on, e.g. [0,0,0], length must
# match the number of input ports
# @param[out] port/array Array/npy/
# \~English A numpy array of arrays
# @par EAGLE_END
class GenericNpyScatterApp(BarrierAppDROP):
"""
An APP that splits an object that has a len attribute into <num_of_copies> parts and
returns a numpy array of arrays.
"""

component_meta = dlg_component(
"GenericNpyScatterApp",
"Scatter an array like object into <num_of_copies> parts",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
)

# automatically populated by scatter node
num_of_copies: int = dlg_int_param("num_of_copies", 1)
scatter_axes: List[int] = dlg_string_param("scatter_axes", "[0]")

def initialize(self, **kwargs):
super(GenericNpyScatterApp, self).initialize(**kwargs)
self.scatter_axes = ast.literal_eval(self.scatter_axes)

def run(self):
if len(self.inputs) * self.num_of_copies != len(self.outputs):
raise DaliugeException(\
f"expected {len(self.inputs) * self.num_of_copies} outputs,\
got {len(self.outputs)}")
if len(self.inputs) != len(self.scatter_axes):
raise DaliugeException(\
f"expected {len(self.inputs)} axes,\
got {len(self.scatter_axes)}")

# split it as many times as we have outputs
self.num_of_copies = self.num_of_copies

for in_index in range(len(self.inputs)):
nObj = droputils.load_numpy(self.inputs[in_index])
try:
result = np.array_split(nObj, self.num_of_copies, axis=self.scatter_axes[in_index])
except IndexError as err:
raise err
for split_index in range(self.num_of_copies):
out_index = in_index * self.num_of_copies + split_index
droputils.save_numpy(self.outputs[out_index], result[split_index])


class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

Expand All @@ -480,7 +548,6 @@ def condition(self):
return self.result



##
# @brief ListAppendThrashingApp\n
# @details A testing APP that appends a random integer to a list num times.
Expand Down
5 changes: 3 additions & 2 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,16 @@ def listify(o):

def save_numpy(drop, ndarray: np.ndarray, allow_pickle=False):
"""
Saves a numpy ndarray to a drop
Saves a numpy ndarray to a drop in npy format
"""
bio = io.BytesIO()
np.save(bio, ndarray, allow_pickle=allow_pickle)
drop.write(bio.getbuffer())


def load_numpy(drop, allow_pickle=False) -> np.ndarray:
"""
Loads a numpy ndarray from a drop
Loads a numpy ndarray from a drop in npy format
"""
dropio = drop.getIO()
dropio.open(OpenMode.OPEN_READ)
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def test_helloworldapp(self):

def test_parallelHelloWorld(self):
m0 = InMemoryDROP("m0", "m0")
s = GenericScatterApp("s", "s")
s = GenericScatterApp("s", "s", num_of_copies=4)
greets = ["World", "Solar system", "Galaxy", "Universe"]
m0.write(pickle.dumps(greets))
s.addInput(m0)
Expand Down Expand Up @@ -194,7 +194,7 @@ def test_genericScatter(self):
data_in = random.randint(0, 100, size=100)
b = InMemoryDROP("b", "b")
b.write(pickle.dumps(data_in))
s = GenericScatterApp("s", "s")
s = GenericScatterApp("s", "s", num_of_copies=2)
s.addInput(b)
o1 = InMemoryDROP("o1", "o1")
o2 = InMemoryDROP("o2", "o2")
Expand Down
12 changes: 8 additions & 4 deletions daliuge-translator/dlg/dropmake/dm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,15 @@ def convert_construct(lgo):
if "group" in node:
app_node["group"] = node["group"]

for app_fd_name in ["appFields", "inputAppFields"]:
if app_fd_name in node:
for afd in node[app_fd_name]:
INPUT_APP_FIELDS = "inputAppFields"
if INPUT_APP_FIELDS in node:
# inputAppFields are converted to fields to be processed like
# regular application drops
app_node["fields"] = list(node[INPUT_APP_FIELDS])
app_node["fields"] += node["fields"]
# TODO: remove, use fields list
for afd in node[INPUT_APP_FIELDS]:
app_node[afd["name"]] = afd["value"]
break

if node["category"] == Categories.GATHER:
app_node["group_start"] = 1
Expand Down

0 comments on commit e30e15c

Please sign in to comment.