Skip to content

Commit

Permalink
Merge 3e993f3 into b8c2322
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed May 31, 2022
2 parents b8c2322 + 3e993f3 commit 92d56be
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 107 deletions.
63 changes: 42 additions & 21 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from . import constants
from .restutils import RestClient


logger = logging.getLogger(__name__)
compress = os.environ.get("DALIUGE_COMPRESSED_JSON", True)

Expand All @@ -49,7 +48,7 @@ def stop(self):
self._POST("/stop")

def cancelSession(self, sessionId):
self._POST("/sessions/%s/cancel" % quote(sessionId))
self._POST(f"/sessions/{quote(sessionId)}/cancel")

def create_session(self, sessionId):
"""
Expand All @@ -68,7 +67,7 @@ def deploy_session(self, sessionId, completed_uids=[]):
content = None
if completed_uids:
content = {"completed": ",".join(completed_uids)}
self._post_form("/sessions/%s/deploy" % (quote(sessionId),), content)
self._post_form(f"/sessions/{quote(sessionId)}/deploy", content)
logger.debug(
"Successfully deployed session %s on %s:%s", sessionId, self.host, self.port
)
Expand All @@ -79,7 +78,7 @@ def append_graph(self, sessionId, graphSpec):
but checking that the graph looks correct
"""
self._post_json(
"/sessions/%s/graph/append" % (quote(sessionId),),
f"/sessions/{quote(sessionId)}/graph/append",
graphSpec,
compress=compress,
)
Expand All @@ -94,7 +93,7 @@ def destroy_session(self, sessionId):
"""
Destroys session `sessionId`
"""
self._DELETE("/sessions/%s" % (quote(sessionId),))
self._DELETE(f"/sessions/{quote(sessionId)}")
logger.debug(
"Successfully deleted session %s on %s:%s", sessionId, self.host, self.port
)
Expand All @@ -104,7 +103,7 @@ def graph_status(self, sessionId):
Returns a dictionary where the keys are DROP UIDs and the values are
their corresponding status.
"""
ret = self._get_json("/sessions/%s/graph/status" % (quote(sessionId),))
ret = self._get_json(f"/sessions/{quote(sessionId)}/graph/status")
logger.debug(
"Successfully read graph status from session %s on %s:%s",
sessionId,
Expand All @@ -118,7 +117,7 @@ def graph(self, sessionId):
Returns a dictionary where the key are the DROP UIDs, and the values are
the DROP specifications.
"""
graph = self._get_json("/sessions/%s/graph" % (quote(sessionId),))
graph = self._get_json(f"/sessions/{quote(sessionId)}/graph")
logger.debug(
"Successfully read graph (%d nodes) from session %s on %s:%s",
len(graph),
Expand All @@ -137,15 +136,15 @@ def sessions(self):
"Successfully read %d sessions from %s:%s",
len(sessions),
self.host,
self.port,
self.port
)
return sessions

def session(self, sessionId):
"""
Returns the details of sessions `sessionId`
"""
session = self._get_json("/sessions/%s" % (quote(sessionId),))
session = self._get_json(f"/sessions/{quote(sessionId)}")
logger.debug(
"Successfully read session %s from %s:%s", sessionId, self.host, self.port
)
Expand All @@ -155,7 +154,7 @@ def session_status(self, sessionId):
"""
Returns the status of session `sessionId`
"""
status = self._get_json("/sessions/%s/status" % (quote(sessionId),))
status = self._get_json(f"/sessions/{quote(sessionId)}/status")
logger.debug(
"Successfully read session %s status (%s) from %s:%s",
sessionId,
Expand All @@ -169,7 +168,7 @@ def session_repro_status(self, sessionId):
"""
Returns the reproducibility status of session `sessionId`.
"""
status = self._get_json("/sessions/%s/repro/status" % (quote(sessionId),))
status = self._get_json(f"/sessions/{quote(sessionId)}/repro/status")
logger.debug(
"Successfully read session %s reproducibility status (%s) from %s:%s",
sessionId,
Expand All @@ -183,7 +182,7 @@ def session_repro_data(self, sessionId):
"""
Returns the graph-wide reproducibility information of session `sessionId`.
"""
data = self._get_json("/sessions/%s/repro/data" % (quote(sessionId),))
data = self._get_json(f"/sessions/{quote(sessionId)}/repro/data")
logger.debug(
"Successfully read session %s reproducibility data from %s:%s",
sessionId,
Expand All @@ -196,7 +195,7 @@ def graph_size(self, sessionId):
"""
Returns the size of the graph of session `sessionId`
"""
count = self._get_json("/sessions/%s/graph/size" % (quote(sessionId)))
count = self._get_json(f"/sessions/{quote(sessionId)}/graph/size")
logger.debug(
"Successfully read session %s graph size (%d) from %s:%s",
sessionId,
Expand All @@ -223,17 +222,17 @@ class NodeManagerClient(BaseDROPManagerClient):
"""

def __init__(
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
):
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def add_node_subscriptions(self, sessionId, node_subscriptions):
self._post_json(
"/sessions/%s/subscriptions" % (quote(sessionId),), node_subscriptions
f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions
)

def trigger_drops(self, sessionId, drop_uids):
self._post_json("/sessions/%s/trigger" % (quote(sessionId),), drop_uids)
self._post_json(f"/sessions/{quote(sessionId)}/trigger", drop_uids)

def shutdown_node_manager(self):
self._GET("/shutdown")
Expand All @@ -247,10 +246,10 @@ def nodes(self):
return self._get_json("/nodes")

def add_node(self, node):
self._POST("/nodes/%s" % (node,), content=None)
self._POST(f"/nodes/{node}", content=None)

def remove_node(self, node):
self._DELETE("/nodes/%s" % (node,))
self._DELETE(f"/nodes/{node}")


class DataIslandManagerClient(CompositeManagerClient):
Expand All @@ -259,7 +258,7 @@ class DataIslandManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
):
super(DataIslandManagerClient, self).__init__(
host=host, port=port, timeout=timeout
Expand All @@ -272,11 +271,33 @@ class MasterManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
):
super(MasterManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def create_island(self, island_host, nodes):
self._post_json(
"/managers/%s/dataisland" % (quote(island_host)), {"nodes": nodes}
f"/managers/{quote(island_host)}/dataisland", {"nodes": nodes}
)

def dims(self):
return self._get_json("/islands")

def add_dim(self, dim):
self._POST(f"/islands/{dim}", content=None)

def remove_dim(self, dim):
self._DELETE(f"/islands/{dim}")

def add_node_to_dim(self, dim, nm):
"""
Adds a nm to a dim
"""
self._POST(
f"managers/{dim}/nodes/{nm}", content=None, )

def remove_node_from_dim(self, dim, nm):
"""
Removes a nm from a dim
"""
self._DELETE(f"managers/{dim}/nodes/{nm}")
44 changes: 20 additions & 24 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ class allows for multiple levels of hierarchy seamlessly.
__metaclass__ = abc.ABCMeta

def __init__(
self,
dmPort,
partitionAttr,
subDmId,
dmHosts=[],
pkeyPath=None,
dmCheckTimeout=10,
self,
dmPort,
partitionAttr,
subDmId,
dmHosts=[],
pkeyPath=None,
dmCheckTimeout=10,
):
"""
Creates a new CompositeManager. The sub-DMs it manages are to be located
Expand Down Expand Up @@ -211,6 +211,10 @@ def dmHosts(self):
def addDmHost(self, host):
self._dmHosts.append(host)

def removeDmHost(self, host):
if host in self._dmHosts:
self._dmHosts.remove(host)

@property
def nodes(self):
return self._nodes[:]
Expand All @@ -235,7 +239,7 @@ def dmAt(self, host, port=None):

if not self.check_dm(host, port):
raise SubManagerException(
"Manager expected but not running in %s:%d" % (host, port)
f"Manager expected but not running in {host}:{port}"
)

port = port or self._dmPort
Expand Down Expand Up @@ -284,10 +288,7 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None
iterable,
)
if thrExs:
msg = "More than one error occurred while %s on session %s" % (
action,
sessionId,
)
msg = f"More than one error occurred while {action} on session {sessionId}"
raise SubManagerException(msg, thrExs)

#
Expand Down Expand Up @@ -347,7 +348,7 @@ def addGraphSpec(self, sessionId, graphSpec):
# belong to the same host, so we can submit that graph into the individual
# DMs. For this we need to make sure that our graph has a the correct
# attribute set
logger.info(f"Separating graph using partition attribute {self._partitionAttr}")
logger.info("Separating graph using partition attribute %s", self._partitionAttr)
perPartition = collections.defaultdict(list)
if "rmode" in graphSpec[-1]:
init_pg_repro_data(graphSpec)
Expand All @@ -360,19 +361,14 @@ def addGraphSpec(self, sessionId, graphSpec):
graphSpec.pop()
for dropSpec in graphSpec:
if self._partitionAttr not in dropSpec:
msg = "Drop %s doesn't specify a %s attribute" % (
dropSpec["oid"],
self._partitionAttr,
)
msg = f"Drop {dropSpec.get('oid', None)} doesn't specify a {self._partitionAttr} " \
f"attribute"
raise InvalidGraphException(msg)

partition = dropSpec[self._partitionAttr]
if partition not in self._dmHosts:
msg = "Drop %s's %s %s does not belong to this DM" % (
dropSpec["oid"],
self._partitionAttr,
partition,
)
msg = f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} " \
f"does not belong to this DM"
raise InvalidGraphException(msg)

perPartition[partition].append(dropSpec)
Expand All @@ -382,7 +378,7 @@ def addGraphSpec(self, sessionId, graphSpec):
# At each partition the relationships between DROPs should be local at the
# moment of submitting the graph; thus we record the inter-partition
# relationships separately and remove them from the original graph spec
logger.info(f"Graph splitted into {perPartition.keys()}")
logger.info("Graph split into %r", perPartition.keys())
inter_partition_rels = []
for dropSpecs in perPartition.values():
inter_partition_rels += graph_loader.removeUnmetRelationships(dropSpecs)
Expand Down Expand Up @@ -448,7 +444,7 @@ def deploySession(self, sessionId, completedDrops=[]):
)
logger.info("Delivered node subscription list to node managers")
logger.debug(
"Number of subscriptions: %s" % len(self._drop_rels[sessionId].items())
"Number of subscriptions: %s", len(self._drop_rels[sessionId].items())
)

logger.info("Deploying Session %s in all hosts", sessionId)
Expand Down
Loading

0 comments on commit 92d56be

Please sign in to comment.