Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YAN-858 Frequency Scattering of Visibilities #94

Merged
merged 7 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"""Applications used as examples, for testing, or in simple situations"""
import pickle
import random
from typing import List
import urllib.error
import urllib.request

import time
import ast
import numpy as np

from dlg import droputils, utils
Expand All @@ -40,7 +42,7 @@
dlg_batch_output,
dlg_streaming_input
)

from dlg.exceptions import DaliugeException
from dlg.apps.pyfunc import serialize_data, deserialize_data


Expand Down Expand Up @@ -439,12 +441,14 @@ class GenericScatterApp(BarrierAppDROP):
[dlg_streaming_input("binary/*")],
)

# automatically populated by scatter node
num_of_copies: int = dlg_int_param("num_of_copies", 1)

def initialize(self, **kwargs):
super(GenericScatterApp, self).initialize(**kwargs)

def run(self):
# split it as many times as we have outputs
numSplit = len(self.outputs)
numSplit = self.num_of_copies
cont = droputils.allDropContents(self.inputs[0])
# if the data is of type string it is not pickled, but stored as a binary string.
try:
Expand All @@ -466,6 +470,70 @@ def run(self):
o.write(d) # average across inputs


##
# @brief GenericNpyScatterApp
# @details An APP that splits about any axis on any npy format data drop
# into as many parts as the app has outputs, provided that the initially converted numpy
# array has enough elements. The return will be a numpy array of arrays, where the first
# axis is of length len(outputs). The modulo remainder of the length of the original array and
# the number of outputs will be distributed across the first len(outputs)-1 elements of the
# resulting array.
# @par EAGLE_START
# @param category PythonApp
# @param[in] param/appclass Application Class/dlg.apps.simple.GenericNpyScatterApp/String/readonly/
# \~English Application class
# @param[in] param/scatter_axes Scatter Axes/String/readwrite
# \~English The axes to split input ndarrays on, e.g. [0,0,0], length must
# match the number of input ports
# @param[out] port/array Array/npy/
# \~English A numpy array of arrays
# @par EAGLE_END
class GenericNpyScatterApp(BarrierAppDROP):
"""
An APP that splits an object that has a len attribute into <num_of_copies> parts and
returns a numpy array of arrays.
"""

component_meta = dlg_component(
"GenericNpyScatterApp",
"Scatter an array like object into <num_of_copies> parts",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
)

# automatically populated by scatter node
num_of_copies: int = dlg_int_param("num_of_copies", 1)
scatter_axes: List[int] = dlg_string_param("scatter_axes", "[0]")

def initialize(self, **kwargs):
super(GenericNpyScatterApp, self).initialize(**kwargs)
self.scatter_axes = ast.literal_eval(self.scatter_axes)

def run(self):
if len(self.inputs) * self.num_of_copies != len(self.outputs):
raise DaliugeException(\
f"expected {len(self.inputs) * self.num_of_copies} outputs,\
got {len(self.outputs)}")
if len(self.inputs) != len(self.scatter_axes):
raise DaliugeException(\
f"expected {len(self.inputs)} axes,\
got {len(self.scatter_axes)}")

# split it as many times as we have outputs
self.num_of_copies = self.num_of_copies

for in_index in range(len(self.inputs)):
nObj = droputils.load_numpy(self.inputs[in_index])
try:
result = np.array_split(nObj, self.num_of_copies, axis=self.scatter_axes[in_index])
except IndexError as err:
raise err
for split_index in range(self.num_of_copies):
out_index = in_index * self.num_of_copies + split_index
droputils.save_numpy(self.outputs[out_index], result[split_index])


class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

Expand All @@ -480,7 +548,6 @@ def condition(self):
return self.result



##
# @brief ListAppendThrashingApp\n
# @details A testing APP that appends a random integer to a list num times.
Expand Down
5 changes: 3 additions & 2 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,16 @@ def listify(o):

def save_numpy(drop, ndarray: np.ndarray, allow_pickle=False):
"""
Saves a numpy ndarray to a drop
Saves a numpy ndarray to a drop in npy format
"""
bio = io.BytesIO()
np.save(bio, ndarray, allow_pickle=allow_pickle)
drop.write(bio.getbuffer())


def load_numpy(drop, allow_pickle=False) -> np.ndarray:
"""
Loads a numpy ndarray from a drop
Loads a numpy ndarray from a drop in npy format
"""
dropio = drop.getIO()
dropio.open(OpenMode.OPEN_READ)
Expand Down
65 changes: 57 additions & 8 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@
import time
import unittest
from multiprocessing.pool import ThreadPool
from numpy import random, mean, array, concatenate

from numpy import random, mean, array, concatenate, random, testing
from psutil import cpu_count

from dlg import droputils
from dlg.apps.simple import GenericScatterApp, SleepApp, CopyApp, SleepAndCopyApp, \
from dlg.apps.simple import (
GenericScatterApp,
GenericNpyScatterApp,
SleepApp,
CopyApp,
SleepAndCopyApp,
ListAppendThrashingApp
)
from dlg.apps.simple import RandomArrayApp, AverageArraysApp, HelloWorldApp
from dlg.ddap_protocol import DROPStates
from dlg.drop import NullDROP, InMemoryDROP, FileDROP, NgasDROP

if sys.version_info >= (3, 8):
from dlg.manager.shared_memory_manager import DlgSharedMemoryManager
from numpy import random, mean, array, concatenate
from psutil import cpu_count

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -146,7 +149,7 @@ def test_helloworldapp(self):

def test_parallelHelloWorld(self):
m0 = InMemoryDROP("m0", "m0")
s = GenericScatterApp("s", "s")
s = GenericScatterApp("s", "s", num_of_copies=4)
greets = ["World", "Solar system", "Galaxy", "Universe"]
m0.write(pickle.dumps(greets))
s.addInput(m0)
Expand Down Expand Up @@ -194,7 +197,7 @@ def test_genericScatter(self):
data_in = random.randint(0, 100, size=100)
b = InMemoryDROP("b", "b")
b.write(pickle.dumps(data_in))
s = GenericScatterApp("s", "s")
s = GenericScatterApp("s", "s", num_of_copies=2)
s.addInput(b)
o1 = InMemoryDROP("o1", "o1")
o2 = InMemoryDROP("o2", "o2")
Expand All @@ -207,6 +210,52 @@ def test_genericScatter(self):
data_out = concatenate([data1, data2])
self.assertEqual(data_in.all(), data_out.all())

def test_genericNpyScatter(self):
data_in = random.rand(100, 100)
b = InMemoryDROP("b", "b")
droputils.save_numpy(b, data_in)
s = GenericNpyScatterApp("s", "s", num_of_copies=2)
s.addInput(b)
o1 = InMemoryDROP("o1", "o1")
o2 = InMemoryDROP("o2", "o2")
for x in o1, o2:
s.addOutput(x)
self._test_graph_runs((b, s, o1, o2), b, (o1, o2), timeout=4)

data1 = droputils.load_numpy(o1)
data2 = droputils.load_numpy(o2)
data_out = concatenate([data1, data2])
self.assertEqual(data_in.all(), data_out.all())

def test_genericNpyScatter_multi(self):
data1_in = random.rand(100, 100)
data2_in = random.rand(100, 100)
b = InMemoryDROP("b", "b")
c = InMemoryDROP("c", "c")
droputils.save_numpy(b, data1_in)
droputils.save_numpy(c, data2_in)
s = GenericNpyScatterApp("s", "s", num_of_copies=2, scatter_axes="[0,0]")
s.addInput(b)
s.addInput(c)
o1 = InMemoryDROP("o1", "o1")
o2 = InMemoryDROP("o2", "o2")
o3 = InMemoryDROP("o3", "o3")
o4 = InMemoryDROP("o4", "o4")
for x in o1, o2, o3, o4:
s.addOutput(x)
self._test_graph_runs((b, s, o1, o2, o3, o4), (b, c), (o1, o2, o3, o4), timeout=4)

data11 = droputils.load_numpy(o1)
data12 = droputils.load_numpy(o2)
data1_out = concatenate([data11, data12])
self.assertEqual(data1_out.shape, data1_in.shape)
testing.assert_array_equal(data1_out, data1_in)

data21 = droputils.load_numpy(o3)
data22 = droputils.load_numpy(o4)
data2_out = concatenate([data21, data22])
testing.assert_array_equal(data2_out, data2_in)

def test_listappendthrashing(self, size=1000):
a = InMemoryDROP('a', 'a')
b = ListAppendThrashingApp('b', 'b', size=size)
Expand Down
12 changes: 8 additions & 4 deletions daliuge-translator/dlg/dropmake/dm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,15 @@ def convert_construct(lgo):
if "group" in node:
app_node["group"] = node["group"]

for app_fd_name in ["appFields", "inputAppFields"]:
if app_fd_name in node:
for afd in node[app_fd_name]:
INPUT_APP_FIELDS = "inputAppFields"
if INPUT_APP_FIELDS in node:
# inputAppFields are converted to fields to be processed like
# regular application drops
app_node["fields"] = list(node[INPUT_APP_FIELDS])
Copy link
Collaborator Author

@calgray calgray Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line I found is needed since app parameters are meant to exist at under "fields" in the logical graph. These fields are from the app view in the inputApplication dropdown. The other fields I think some careful consideration needs to be made about. A logical graph schema may help a lot.

app_node["fields"] += node["fields"]
# TODO: remove, use fields list
for afd in node[INPUT_APP_FIELDS]:
app_node[afd["name"]] = afd["value"]
break

if node["category"] == Categories.GATHER:
app_node["group_start"] = 1
Expand Down