Skip to content

Commit

Permalink
Merge a3f796b into 53ce2c1
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Feb 2, 2022
2 parents 53ce2c1 + a3f796b commit 601f68f
Show file tree
Hide file tree
Showing 2 changed files with 620 additions and 574 deletions.
92 changes: 88 additions & 4 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
# MA 02111-1307 USA
#
"""Applications used as examples, for testing, or in simple situations"""
from numbers import Number
import pickle
import random
from typing import List
from typing import List, Optional
import urllib.error
import urllib.request

Expand Down Expand Up @@ -254,8 +255,8 @@ class AverageArraysApp(BarrierAppDROP):
from numpy import mean, median

component_meta = dlg_component(
"RandomArrayApp",
"Random Array App.",
"AverageArraysApp",
"Average Array App.",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
Expand Down Expand Up @@ -305,11 +306,94 @@ def getInputArrays(self):
self.marray = marray

def averageArray(self):

method_to_call = getattr(np, self.method)
return method_to_call(self.marray, axis=0)


##
# @brief GenericNpyGatherApp
# @details A BarrierAppDrop that combines one or more inputs using cummulative operations.
# @param category PythonApp
# @param[in] param/appclass Application Class/dlg.apps.simple.GenericNpyGatherApp/String/readonly/
# \~English Application class
# @param[in] param/function Function/sum/String/readwrite/
# \~English The function used for gathering
# @param[in] param/function reduce_axes/None/String/readonly/
# \~English The ndarray axes to reduce, None reduces all axes for sum, prod, max, min functions
# @param[in] port/array Array/npy/
# \~English Port for the input array(s)
# @param[out] port/array Array/npy/
# \~English Port carrying the reduced array
# @par EAGLE_END
class GenericNpyGatherApp(BarrierAppDROP):
"""
A BarrierAppDrop that reduces then gathers one or more inputs using cummulative operations.
function: string <['sum']|'prod'|'min'|'max'|'add'|'multiply'|'maximum'|'minimum'>.
"""
component_meta = dlg_component(
"GenericNpyGatherApp",
"Generic Npy Gather App.",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
)

# reduce and combine operation pair names
functions = {
# reduce and gather e.g. output dimension is reduces
"sum": "add", # sum reduction of inputs along an axis first then reduces across drops
"prod": "multiply", # prod reduction of inputs along an axis first then reduces across drops
"max": "maximum", # max reduction of input along an axis first then reduces across drops
"min": "minimum", # min reduction of input along an axis first then reduces across drops

# gather only
"add": None, # elementwise addition of inputs, ndarrays must be of same shape
"multiply": None, # elementwise multiplication of inputs, ndarrays must be of same shape
"maximum": None, # elementwise maximums of inputs, ndarrays must be of same shape
"minimum": None # elementwise minimums of inputs, ndarrays must be of same shape
}
function: str = dlg_string_param("function", "sum")
reduce_axes: str = dlg_string_param("reduce_axes", "None")

def __init__(self, oid, uid, **kwargs):
super().__init__(oid, kwargs)

def initialize(self, **kwargs):
super(GenericNpyGatherApp, self).initialize(**kwargs)
self.reduce_axes = ast.literal_eval(self.reduce_axes)

def run(self):
if len(self.inputs) < 1:
raise Exception(f"At least one input should have been added to {self}")
if len(self.outputs) < 1:
raise Exception(f"At least one output should have been added to {self}")
if self.function not in self.functions:
raise Exception(f"Function {self.function} not supported by {self}")

result = self.reduce_combine_inputs() if self.functions[self.function] is not None else self.combine_inputs()
for o in self.outputs:
droputils.save_numpy(o, result)

def reduce_combine_inputs(self):
result: Optional[Number] = None
reduce = getattr(np, f"{self.function}")
combine = getattr(np, f"{self.functions[self.function]}")
for input in self.inputs:
data = droputils.load_numpy(input)
result = reduce(data, axis=self.reduce_axes)\
if result is None\
else combine(result, reduce(data, axis=self.reduce_axes))
return result

def combine_inputs(self):
result: Optional[Number] = None
combine = getattr(np, f"{self.functions[self.function]}")
for input in self.inputs:
data = droputils.load_numpy(input)
result = data if result is None else combine(result, data)
return result


##
# @brief HelloWorldApp
# @details A simple APP that implements the standard Hello World in DALiuGE.
Expand Down
Loading

0 comments on commit 601f68f

Please sign in to comment.