Skip to content

Commit

Permalink
Merge f2d9ab3 into baea9b8
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Dec 8, 2021
2 parents baea9b8 + f2d9ab3 commit 6f619f7
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 29 deletions.
31 changes: 24 additions & 7 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@
import re
import threading
import traceback
import numpy as np

from .ddap_protocol import DROPStates
from .drop import AppDROP, AbstractDROP
from .apps.dockerapp import DockerApp
from .io import IOForURL, OpenMode
from . import common
from .common import DropType

from dlg.ddap_protocol import DROPStates
from dlg.drop import AppDROP, AbstractDROP
from dlg.io import IOForURL, OpenMode
from dlg import common
from dlg.common import DropType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -266,6 +265,24 @@ def listify(o):
return [o]


def save_numpy(drop, ndarray: np.ndarray, allow_pickle=False):
"""
Saves a numpy ndarray to a drop
"""
bio = io.BytesIO()
np.save(bio, ndarray, allow_pickle=allow_pickle)
drop.write(bio.getbuffer())

def load_numpy(drop, allow_pickle=False) -> np.ndarray:
"""
Loads a numpy ndarray from a drop
"""
dropio = drop.getIO()
dropio.open(OpenMode.OPEN_READ)
res = np.load(io.BytesIO(dropio.buffer()), allow_pickle=allow_pickle)
dropio.close()
return res

class DROPFile(object):
"""
A file-like object (currently only supporting the read() operation, more to
Expand Down
16 changes: 11 additions & 5 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ def exists(self):
def delete(self):
self._buf.close()

def buffer(self):
@overrides
def buffer(self) -> memoryview:
return self._open().getbuffer()


Expand All @@ -265,9 +266,9 @@ def __init__(self, filename, **kwargs):
super(FileIO, self).__init__()
self._fnm = filename

def _open(self, **kwargs):
flag = "r" if self._mode is OpenMode.OPEN_READ else "w"
flag += "b"
def _open(self, **kwargs) -> io.BufferedReader:
flag = 'r' if self._mode is OpenMode.OPEN_READ else 'w'
flag += 'b'
try:
return open(self._fnm, flag)
except FileNotFoundError:
Expand Down Expand Up @@ -581,7 +582,8 @@ def exists(self):
def delete(self):
pass

def buffer(self):
@overrides
def buffer(self) -> memoryview:
[data] = self._desc.get_buffers([self._object_id])
return memoryview(data)

Expand Down Expand Up @@ -644,3 +646,7 @@ def exists(self):

def delete(self):
pass

@overrides
def buffer(self) -> memoryview:
return self._desc.get(self._object_id, self._flight_path)
44 changes: 27 additions & 17 deletions daliuge-translator/dlg/dropmake/pg_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,10 @@ def is_loop(self):
return self.is_group() and self._jd["category"] == Categories.LOOP

def is_service(self):
# services nodes are replaced with the input application in the logical graph
return ("isService" in self._jd and self._jd["isService"]) or self._jd[
"category"
] == Categories.SERVICE
"""
Determines whether a node the parent service node (not the input application)
"""
return self._jd["category"] == Categories.SERVICE

def is_groupby(self):
return self._jd["category"] == Categories.GROUP_BY
Expand Down Expand Up @@ -564,7 +564,7 @@ def _update_key_value_attributes(self, kwargs):
else:
kwargs[k_v[0]] = k_v[1]

def _create_test_drop_spec(self, oid, rank, kwargs):
def _create_test_drop_spec(self, oid, rank, kwargs) -> dropdict:
"""
TODO
This is a test function only
Expand Down Expand Up @@ -623,11 +623,7 @@ def _create_test_drop_spec(self, oid, rank, kwargs):
kwargs["filepath"] = fp
self._update_key_value_attributes(kwargs)
drop_spec.update(kwargs)
elif drop_type in [
Categories.COMPONENT,
Categories.PYTHON_APP,
Categories.BRANCH,
]:
elif drop_type in [Categories.COMPONENT, Categories.PYTHON_APP, Categories.BRANCH]:
# default generic component becomes "sleep and copy"
if "appclass" not in self.jd or len(self.jd["appclass"]) == 0:
app_class = "dlg.apps.simple.SleepApp"
Expand Down Expand Up @@ -655,11 +651,13 @@ def _create_test_drop_spec(self, oid, rank, kwargs):
drop_spec = dropdict(
{"oid": oid, "type": DropType.APP, "app": app_class, "rank": rank}
)

kwargs["num_cpus"] = int(self.jd.get("num_cpus", 1))
if "mkn" in self.jd:
kwargs["mkn"] = self.jd["mkn"]
self._update_key_value_attributes(kwargs)
drop_spec.update(kwargs)

elif drop_type in [Categories.DYNLIB_APP, Categories.DYNLIB_PROC_APP]:
if "libpath" not in self.jd or len(self.jd["libpath"]) == 0:
raise GraphException("Missing 'libpath' in Drop {0}".format(self.text))
Expand Down Expand Up @@ -812,8 +810,21 @@ def _create_test_drop_spec(self, oid, rank, kwargs):
kwargs["sleepTime"] = 1
drop_spec.addOutput(dropSpec_gather)
dropSpec_gather.addProducer(drop_spec)
elif drop_type == Categories.SERVICE:
raise GraphException(f"DROP type: {drop_type} should not appear in physical graph")
# drop_spec = dropdict(
# {
# "oid": oid,
# "type": DropType.SERVICE_APP,
# "app": "dlg.apps.simple.SleepApp",
# "rank": rank
# }
# )
# kwargs["tw"] = 1

# elif drop_type == Categories.BRANCH:
# Branches are now dealt with like any other application and essentially ignored by the translator.

elif drop_type in [Categories.START, Categories.END]:
# this is at least suspicious in terms of implementation....
drop_spec = dropdict(
Expand Down Expand Up @@ -843,6 +854,8 @@ def make_single_drop(self, iid="0", **kwargs):
kwargs["lg_key"] = self.id
kwargs["dt"] = self.jd["category"]
kwargs["nm"] = self.text
if "isService" in self.jd and self.jd["isService"]:
kwargs["type"] = DropType.SERVICE_APP
dropSpec.update(kwargs)
return dropSpec

Expand Down Expand Up @@ -2081,7 +2094,8 @@ def lgn_to_pgn(self, lgn, iid="0", lpcxt=None):
lpcxt: Loop context
"""
if lgn.is_group():
extra_links_drops = not lgn.is_scatter() and not lgn.is_service()
# services nodes are replaced with the input application in the logical graph
extra_links_drops = not lgn.is_scatter()
if extra_links_drops:
non_inputs = []
grp_starts = []
Expand Down Expand Up @@ -2173,13 +2187,9 @@ def get_child_lp_ctx(idx):
src_drop = lgn.make_single_drop(miid, loop_cxt=lpcxt, proc_index=i)
self._drop_dict[lgn.id].append(src_drop)
elif lgn.is_service():
src_drop = lgn.make_single_drop(iid, loop_cxt=lpcxt)
src_drop["type"] = DropType.SERVICE_APP
self._drop_dict[lgn.id].append(src_drop)
if lgn.is_start_listener():
self._drop_dict["new_added"].append(src_drop["listener_drop"])
# no action required, inputapp node aleady created and marked with "isService"
pass
else:
# TODO !!
src_drop = lgn.make_single_drop(iid, loop_cxt=lpcxt)
self._drop_dict[lgn.id].append(src_drop)
if lgn.is_start_listener():
Expand Down

0 comments on commit 6f619f7

Please sign in to comment.