Skip to content

Commit

Permalink
Merge pull request #251 from ICRAR/eagle-1184
Browse files Browse the repository at this point in the history
Eagle 1184
  • Loading branch information
awicenec committed May 13, 2024
2 parents 79be968 + 09eb7d0 commit 51b7ea0
Show file tree
Hide file tree
Showing 45 changed files with 206 additions and 355 deletions.
60 changes: 57 additions & 3 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
dlg_bool_param,
dlg_int_param,
dlg_list_param,
dlg_dict_param,
dlg_component,
dlg_batch_input,
dlg_batch_output,
Expand Down Expand Up @@ -432,6 +433,50 @@ class GenericGatherApp(BarrierAppDROP):
def initialize(self, **kwargs):
super(GenericGatherApp, self).initialize(**kwargs)

def readWriteData(self):
inputs = self.inputs
outputs = self.outputs
total_len = 0
for output in outputs:
for input in inputs:
value = droputils.allDropContents(input)
output.write(value)

def run(self):
self.readWriteData()


##
# @brief DictGatherApp
# @details App packs all data on input into a dictionary using the input drop's names as keys and the reading the
# dict values from the input drops. This app can be used stand-alone without a gather construct.
# @par EAGLE_START
# @param category PythonApp
# @param tag daliuge
# @param value_dict value_dict/Jasom/ApplicationArgument/NoPort/ReadWrite//False/False/The value dictionary can be initialized here
# @param dropclass dlg.apps.simple.DictGatherApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
# @param input /Object/ApplicationArgument/InputPort/ReadWrite//False/False/0-base placeholder port for inputs
# @param output /Object/ApplicationArgument/OutputPort/ReadWrite//False/False/Placeholder port for outputs
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
# @par EAGLE_END
class DictGatherApp(BarrierAppDROP):
component_meta = dlg_component(
"DictGatherApp",
"Collect multiple inputs into a dictionary",
[dlg_batch_input("binary/*", [])],
[dlg_batch_output("binary/*", [])],
[dlg_streaming_input("binary/*")],
)
value_dict = dlg_dict_param("value_dict", {})

def initialize(self, **kwargs):
super(DictGatherApp, self).initialize(**kwargs)
self.kwargs = kwargs

def readWriteData(self):
inputs = self.inputs
outputs = self.outputs
Expand All @@ -441,10 +486,19 @@ def readWriteData(self):
# logger.debug(f">>>> writing {inputs} to {outputs}")
for output in outputs:
for input in inputs:
d = droputils.allDropContents(input)
output.write(d)
value = droputils.allDropContents(input)
self.value_dict[input.name] = pickle.loads(value)
for aa_key, aa_dict in self.kwargs["applicationArgs"].items():
if aa_key not in self.value_dict and aa_dict["value"]:
self.value_dict[aa_key] = aa_dict["value"]
logger.debug(
"Writing %s to %s",
self.value_dict,
output.name,
)
output.write(pickle.dumps(self.value_dict))

# logger.debug(f">>> written {d} to {output}")
# logger.debug(f">>> written {d} to {output}")

def run(self):
self.readWriteData()
Expand Down
5 changes: 5 additions & 0 deletions daliuge-engine/dlg/data/drops/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def parse_pydata(pd_dict: dict) -> bytes:
pydata = json.loads(pydata)
except:
pydata = pydata.encode()
if pd_dict["type"].lower() == "eval":
# try:
pydata = eval(pydata)
# except:
# pydata = pydata.encode()
elif pd_dict["type"].lower() == "int":
try:
pydata = int(pydata)
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/data/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ def _write(self, data, **kwargs) -> int:
self._buf += data
else:
self._desc.send(data)
logger.debug("Wrote %s bytes", len(data))
# logger.debug("Wrote %s bytes", len(data))
return len(data)

def exists(self) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def loadDropSpecs(dropSpecList):
if oid in dropSpecs:
dropSpecs[oid]
else:
continue
raise KeyError

# N-1 relationships
elif rel in __TOONE:
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None
iterable,
)
if thrExs:
msg = f"More than one error occurred while {action} on session {sessionId}"
msg = f"ERRROR(s) occurred while {action} for session {sessionId}"
raise SubManagerException(msg, thrExs)

#
Expand Down Expand Up @@ -606,4 +606,4 @@ def __init__(self, dmHosts=[], pkeyPath=None, dmCheckTimeout=10):
pkeyPath=pkeyPath,
dmCheckTimeout=dmCheckTimeout,
)
logger.info("Created MasterManager for hosts: %r", self._dmHosts)
logger.info("Created MasterManager for DIM hosts: %r", self._dmHosts)
9 changes: 6 additions & 3 deletions daliuge-engine/dlg/manager/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ def fwrapper(*args, **kwargs):
origin = bottle.request.headers.raw("Origin")
logger.debug("CORS request comming from: %s", origin)
if origin is None or re.match(
r"http://(dlg-trans.local:80[0-9][0-9]|dlg-trans.icrar.org)", origin
r"(http://dlg-trans.local:80[0-9][0-9]|https://dlg-trans.icrar.org)",
origin,
):
origin = "http://dlg-trans.local:8084"
pass
elif re.match(r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin):
origin = "http://localhost:8084"
bottle.response.headers["Access-Control-Allow-Origin"] = origin
Expand Down Expand Up @@ -122,9 +123,11 @@ def fwrapper(*args, **kwargs):
eargs = {}
# args[1] is a dictionary of host:exception
for host, subex in e.args[1].items():
logger.debug(">>>> Error class name: %s", subex.__class__.__name__)
eargs[host] = {
"type": subex.__class__.__name__,
"args": subex.args,
# "args": subex.args,
"args": "dummy",
}
elif isinstance(e, DaliugeException):
status, eargs = 555, e.args
Expand Down
70 changes: 22 additions & 48 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,23 +287,19 @@ def addGraphSpec(self, graphSpec):
self.status = SessionStates.BUILDING

# This will check the consistency of each dropSpec
logger.debug("Trying to add graphSpec: %s", [[x['oid'],x['node']] for x in graphSpec])
graphSpecDict, self._graphreprodata = graph_loader.loadDropSpecs(
graphSpec
)
# logger.debug("Trying to add graphSpec: %s", [x.keys() for x in graphSpec])
logger.debug("Trying to add graphSpec: %s", graphSpec)
graphSpecDict, self._graphreprodata = graph_loader.loadDropSpecs(graphSpec)
# Check for duplicates
duplicates = set(graphSpecDict) & set(self._graph)
if duplicates:
raise InvalidGraphException(
"Trying to add drops with OIDs that already exist: %r"
% (duplicates,)
"Trying to add drops with OIDs that already exist: %r" % (duplicates,)
)

self._graph.update(graphSpecDict)

logger.debug(
"Added a graph definition with %d DROPs", len(graphSpecDict)
)
logger.debug("Added a graph definition with %d DROPs", len(graphSpecDict))

@track_current_session
def linkGraphParts(self, lhOID, rhOID, linkType, force=False):
Expand All @@ -329,9 +325,7 @@ def linkGraphParts(self, lhOID, rhOID, linkType, force=False):
missingOids.append(rhOID)
if missingOids:
oids = "OID" if len(missingOids) == 1 else "OIDs"
raise InvalidGraphException(
"No DROP found for %s %r" % (oids, missingOids)
)
raise InvalidGraphException("No DROP found for %s %r" % (oids, missingOids))

graph_loader.addLink(linkType, lhDropSpec, rhOID, force=force)

Expand All @@ -356,8 +350,7 @@ def deploy(self, completedDrops=[], event_listeners=[], foreach=None):
not self._graph and status != SessionStates.PRISTINE
):
raise InvalidSessionState(
"Can't deploy this session in its current status: %d"
% (status)
"Can't deploy this session in its current status: %d" % (status)
)

if not self._graph and completedDrops:
Expand Down Expand Up @@ -468,9 +461,7 @@ def _run(self, worker):
def trigger_drops(self, uids):
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
downStreamDrops[:] = [
dsDrop
for dsDrop in downStreamDrops
if isinstance(dsDrop, AbstractDROP)
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
]
if drop.uid in uids:
if isinstance(drop, InputFiredAppDROP):
Expand Down Expand Up @@ -547,9 +538,7 @@ def add_node_subscriptions(self, relationships):
remote_uid = rel.rhs
mname = LINKTYPE_1TON_BACK_APPEND_METHOD[rel.rel]

self._proxyinfo.append(
(host, rpc_port, local_uid, mname, remote_uid)
)
self._proxyinfo.append((host, rpc_port, local_uid, mname, remote_uid))

def append_reprodata(self, oid, reprodata):
if oid in self._graph:
Expand All @@ -561,30 +550,26 @@ def append_reprodata(self, oid, reprodata):
drop_reprodata = reprodata.get("data", {})
drop_hashes = reprodata.get("merkleroot", {})
for rmode in ALL_RMODES:
self._graph[oid]["reprodata"][rmode.name][
"rg_data"
] = drop_reprodata[rmode.name]
self._graph[oid]["reprodata"][rmode.name]["rg_data"] = (
drop_reprodata[rmode.name]
)
self._graph[oid]["reprodata"][rmode.name]["rg_data"][
"merkleroot"
] = drop_hashes.get(rmode.name, b"")

else:
self._graph[oid]["reprodata"]["rg_data"] = reprodata.get(
"data", {}
self._graph[oid]["reprodata"]["rg_data"] = reprodata.get("data", {})
self._graph[oid]["reprodata"]["rg_data"]["merkleroot"] = reprodata.get(
"merkleroot", b""
)
self._graph[oid]["reprodata"]["rg_data"][
"merkleroot"
] = reprodata.get("merkleroot", b"")

@track_current_session
def finish(self):
self.status = SessionStates.FINISHED
logger.info("Session %s finished", self._sessionId)
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
downStreamDrops[:] = [
dsDrop
for dsDrop in downStreamDrops
if isinstance(dsDrop, AbstractDROP)
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
]
if drop.status in (DROPStates.INITIALIZED, DROPStates.WRITING):
drop.setCompleted()
Expand All @@ -595,9 +580,7 @@ def end(self):
logger.info("Session %s ended", self._sessionId)
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
downStreamDrops[:] = [
dsDrop
for dsDrop in downStreamDrops
if isinstance(dsDrop, AbstractDROP)
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
]
if drop.status in (DROPStates.INITIALIZED, DROPStates.WRITING):
drop.skip()
Expand All @@ -621,9 +604,7 @@ def getGraphStatus(self):
statusDict = collections.defaultdict(dict)
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
downStreamDrops[:] = [
dsDrop
for dsDrop in downStreamDrops
if isinstance(dsDrop, AbstractDROP)
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
]
if isinstance(drop, AppDROP):
statusDict[drop.oid]["execStatus"] = drop.execStatus
Expand All @@ -636,14 +617,11 @@ def cancel(self):
status = self.status
if status != SessionStates.RUNNING:
raise InvalidSessionState(
"Can't cancel this session in its current status: %d"
% (status)
"Can't cancel this session in its current status: %d" % (status)
)
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
downStreamDrops[:] = [
dsDrop
for dsDrop in downStreamDrops
if isinstance(dsDrop, AbstractDROP)
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
]
if drop.status not in (
DROPStates.ERROR,
Expand Down Expand Up @@ -682,9 +660,7 @@ def get_drop_property(self, uid, prop_name):
drop = self._drops[uid]
return getattr(drop, prop_name)
except AttributeError:
raise DaliugeException(
"%r has no property called %s" % (drop, prop_name)
)
raise DaliugeException("%r has no property called %s" % (drop, prop_name))

def call_drop(self, uid, method, *args):
if uid not in self._drops:
Expand All @@ -693,9 +669,7 @@ def call_drop(self, uid, method, *args):
drop = self._drops[uid]
m = getattr(drop, method)
except AttributeError:
raise DaliugeException(
"%r has no method called %s" % (drop, method)
)
raise DaliugeException("%r has no method called %s" % (drop, method))
return m(*args)

# Support for the 'with' keyword
Expand Down
10 changes: 8 additions & 2 deletions daliuge-engine/dlg/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,20 @@ def start_nm_in_thread(self, port=constants.NODE_DEFAULT_REST_PORT):
return self._start_manager_in_thread(port, NodeManager, NMRestServer, False)

def start_dim_in_thread(
self, nm_hosts=["localhost"], port=constants.ISLAND_DEFAULT_REST_PORT
self,
nm_hosts=[f"localhost:{constants.NODE_DEFAULT_REST_PORT}"],
port=constants.ISLAND_DEFAULT_REST_PORT,
):
return self._start_manager_in_thread(
port, DataIslandManager, CompositeManagerRestServer, nm_hosts
)

def start_mm_in_thread(
self, nm_hosts=["localhost"], port=constants.MASTER_DEFAULT_REST_PORT
self,
nm_hosts=[
f"localhost:{constants.ISLAND_DEFAULT_REST_PORT}",
],
port=constants.MASTER_DEFAULT_REST_PORT,
):
return self._start_manager_in_thread(
port, MasterManager, CompositeManagerRestServer, nm_hosts
Expand Down
8 changes: 3 additions & 5 deletions daliuge-engine/test/deploy/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def _submit(self):
]
pg = add_test_reprodata(pg)
for drop in pg:
drop["node"] = "localhost"
drop["island"] = "localhost"
drop["node"] = f"localhost:{constants.NODE_DEFAULT_REST_PORT}"
drop["island"] = f"localhost:{constants.ISLAND_DEFAULT_REST_PORT}"
return common.submit(pg, "localhost", self.port)

def assert_sessions_finished(self, status, *session_ids):
Expand All @@ -91,9 +91,7 @@ def test_submit(self):

def test_monitor(self):
session_id = self._submit()
status = common.monitor_sessions(
session_id, port=self.port, poll_interval=0.1
)
status = common.monitor_sessions(session_id, port=self.port, poll_interval=0.1)
self.assert_session_finished(status)

def test_monitor_all(self):
Expand Down
Loading

0 comments on commit 51b7ea0

Please sign in to comment.