Skip to content

Commit

Permalink
Merge 503e058 into 84188b4
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Jul 19, 2021
2 parents 84188b4 + 503e058 commit 6bd8f16
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 5 deletions.
52 changes: 51 additions & 1 deletion daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,56 @@ def run(self):
o.len = len(content)
o.write(content) # send content to all outputs

##
# @brief GenericSplitApp\n
# @details An APP that splits about any object that can be converted to a numpy array
# into as many parts as the app has outputs, provided that the initially converted numpy
# array has enough elements. The return will be a numpy array of arrays, where the first
# axis is of length len(outputs). The modulo remainder of the length of the original array and
# the number of outputs will be distributed across the first len(outputs)-1 elements of the
# resulting array.
# @par EAGLE_START
# @param gitrepo $(GIT_REPO)
# @param version $(PROJECT_VERSION)
# @param category PythonApp
# @param[in] param/appclass/dlg.apps.simple.GenericSplitApp/String/readonly
# \~English Application class\n
# @param[out] port/content
# \~English The port carrying the content read from the URL.
# @par EAGLE_END


class GenericScatterApp(BarrierAppDROP):
"""
An APP that splits an object that has a len attribute into <numSplit> parts and
returns a numpy array of arrays, where the first axis is of length <numSplit>.
"""
compontent_meta = dlg_component('GenericScatterApp', 'Scatter an array like object into numSplit parts',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def initialize(self, **kwargs):
super(GenericScatterApp, self).initialize(**kwargs)

def run(self):
# split it as many times as we have outputs
numSplit = len(self.outputs)
inpArray = pickle.loads(droputils.allDropContents(self.inputs[0]))
try: # just checking whether the object is some object that can be used as an array
nObj = np.array(inpArray)
except:
raise
try:
result = np.array_split(nObj, numSplit)
except IndexError as err:
raise err
for i in range(numSplit):
o = self.outputs[i]
d = pickle.dumps(result[i])
o.len = len(d)
o.write(d) # average across inputs

class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

Expand All @@ -385,4 +435,4 @@ def run(self):
pass

def condition(self):
return self.result
return self.result
3 changes: 1 addition & 2 deletions daliuge-engine/dlg/manager/web/static/css/progressBar.css
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
.node rect, .node polygon {
stroke-width: 2.0px;
stroke: #bbb;
height:100%;
}

/* DROP states */
Expand Down Expand Up @@ -45,4 +44,4 @@

.node.error :first-child, rect.error {
fill: #e44f33;
}
}
20 changes: 18 additions & 2 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import os
import pickle
import unittest
from numpy import random, mean, array
from numpy import random, mean, array, concatenate


from dlg import droputils
from dlg.droputils import DROPWaiterCtx
from dlg.apps.simple import SleepApp, CopyApp, SleepAndCopyApp
from dlg.apps.simple import GenericScatterApp, SleepApp, CopyApp, SleepAndCopyApp
from dlg.apps.simple import RandomArrayApp, AverageArraysApp, HelloWorldApp
from dlg.ddap_protocol import DROPStates
from dlg.drop import NullDROP, InMemoryDROP, FileDROP, NgasDROP
Expand Down Expand Up @@ -149,6 +149,22 @@ def test_ngasio(self):
self._test_graph_runs((nd_in,b,i,nd_out),nd_in, nd_out, timeout=4)
self.assertEqual(b"Hello World", droputils.allDropContents(i))

def test_genericScatter(self):
data_in = random.randint(0, 100, size=100)
b = InMemoryDROP('b', 'b')
b.write(pickle.dumps(data_in))
s = GenericScatterApp('s', 's')
s.addInput(b)
o1 = InMemoryDROP('o1', 'o1')
o2 = InMemoryDROP('o2', 'o2')
for x in o1, o2:
s.addOutput(x)
self._test_graph_runs((b, s, o1, o2), b, (o1, o2), timeout=4)

data1 = pickle.loads(droputils.allDropContents(o1))
data2 = pickle.loads(droputils.allDropContents(o2))
data_out = concatenate([data1, data2])
self.assertEqual(data_in.all(), data_out.all())



Expand Down

0 comments on commit 6bd8f16

Please sign in to comment.