Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Liu 186 - Register DIM with zeroconf with MM #156

Merged
merged 15 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from . import constants
from .restutils import RestClient


logger = logging.getLogger(__name__)
compress = os.environ.get("DALIUGE_COMPRESSED_JSON", True)

Expand All @@ -49,7 +48,7 @@ def stop(self):
self._POST("/stop")

def cancelSession(self, sessionId):
self._POST("/sessions/%s/cancel" % quote(sessionId))
self._POST(f"/sessions/{quote(sessionId)}/cancel")

def create_session(self, sessionId):
"""
Expand All @@ -68,7 +67,7 @@ def deploy_session(self, sessionId, completed_uids=[]):
content = None
if completed_uids:
content = {"completed": ",".join(completed_uids)}
self._post_form("/sessions/%s/deploy" % (quote(sessionId),), content)
self._post_form(f"/sessions/{quote(sessionId)}/deploy", content)
logger.debug(
"Successfully deployed session %s on %s:%s", sessionId, self.host, self.port
)
Expand All @@ -79,7 +78,7 @@ def append_graph(self, sessionId, graphSpec):
but checking that the graph looks correct
"""
self._post_json(
"/sessions/%s/graph/append" % (quote(sessionId),),
f"/sessions/{quote(sessionId)}/graph/append",
graphSpec,
compress=compress,
)
Expand All @@ -94,7 +93,7 @@ def destroy_session(self, sessionId):
"""
Destroys session `sessionId`
"""
self._DELETE("/sessions/%s" % (quote(sessionId),))
self._DELETE(f"/sessions/{quote(sessionId)}")
logger.debug(
"Successfully deleted session %s on %s:%s", sessionId, self.host, self.port
)
Expand All @@ -104,7 +103,7 @@ def graph_status(self, sessionId):
Returns a dictionary where the keys are DROP UIDs and the values are
their corresponding status.
"""
ret = self._get_json("/sessions/%s/graph/status" % (quote(sessionId),))
ret = self._get_json(f"/sessions/{quote(sessionId)}/graph/status")
logger.debug(
"Successfully read graph status from session %s on %s:%s",
sessionId,
Expand All @@ -118,7 +117,7 @@ def graph(self, sessionId):
Returns a dictionary where the key are the DROP UIDs, and the values are
the DROP specifications.
"""
graph = self._get_json("/sessions/%s/graph" % (quote(sessionId),))
graph = self._get_json(f"/sessions/{quote(sessionId)}/graph")
logger.debug(
"Successfully read graph (%d nodes) from session %s on %s:%s",
len(graph),
Expand All @@ -137,15 +136,15 @@ def sessions(self):
"Successfully read %d sessions from %s:%s",
len(sessions),
self.host,
self.port,
self.port
)
return sessions

def session(self, sessionId):
"""
Returns the details of sessions `sessionId`
"""
session = self._get_json("/sessions/%s" % (quote(sessionId),))
session = self._get_json(f"/sessions/{quote(sessionId)}")
logger.debug(
"Successfully read session %s from %s:%s", sessionId, self.host, self.port
)
Expand All @@ -155,7 +154,7 @@ def session_status(self, sessionId):
"""
Returns the status of session `sessionId`
"""
status = self._get_json("/sessions/%s/status" % (quote(sessionId),))
status = self._get_json(f"/sessions/{quote(sessionId)}/status")
logger.debug(
"Successfully read session %s status (%s) from %s:%s",
sessionId,
Expand All @@ -169,7 +168,7 @@ def session_repro_status(self, sessionId):
"""
Returns the reproducibility status of session `sessionId`.
"""
status = self._get_json("/sessions/%s/repro/status" % (quote(sessionId),))
status = self._get_json(f"/sessions/{quote(sessionId)}/repro/status")
logger.debug(
"Successfully read session %s reproducibility status (%s) from %s:%s",
sessionId,
Expand All @@ -183,7 +182,7 @@ def session_repro_data(self, sessionId):
"""
Returns the graph-wide reproducibility information of session `sessionId`.
"""
data = self._get_json("/sessions/%s/repro/data" % (quote(sessionId),))
data = self._get_json(f"/sessions/{quote(sessionId)}/repro/data")
logger.debug(
"Successfully read session %s reproducibility data from %s:%s",
sessionId,
Expand All @@ -196,7 +195,7 @@ def graph_size(self, sessionId):
"""
Returns the size of the graph of session `sessionId`
"""
count = self._get_json("/sessions/%s/graph/size" % (quote(sessionId)))
count = self._get_json(f"/sessions/{quote(sessionId)}/graph/size")
logger.debug(
"Successfully read session %s graph size (%d) from %s:%s",
sessionId,
Expand All @@ -223,17 +222,17 @@ class NodeManagerClient(BaseDROPManagerClient):
"""

def __init__(
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
):
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def add_node_subscriptions(self, sessionId, node_subscriptions):
self._post_json(
"/sessions/%s/subscriptions" % (quote(sessionId),), node_subscriptions
f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions
)

def trigger_drops(self, sessionId, drop_uids):
self._post_json("/sessions/%s/trigger" % (quote(sessionId),), drop_uids)
self._post_json(f"/sessions/{quote(sessionId)}/trigger", drop_uids)

def shutdown_node_manager(self):
self._GET("/shutdown")
Expand All @@ -247,10 +246,10 @@ def nodes(self):
return self._get_json("/nodes")

def add_node(self, node):
self._POST("/nodes/%s" % (node,), content=None)
self._POST(f"/nodes/{node}", content=None)

def remove_node(self, node):
self._DELETE("/nodes/%s" % (node,))
self._DELETE(f"/nodes/{node}")


class DataIslandManagerClient(CompositeManagerClient):
Expand All @@ -259,7 +258,7 @@ class DataIslandManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
):
super(DataIslandManagerClient, self).__init__(
host=host, port=port, timeout=timeout
Expand All @@ -272,11 +271,33 @@ class MasterManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
):
super(MasterManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def create_island(self, island_host, nodes):
self._post_json(
"/managers/%s/dataisland" % (quote(island_host)), {"nodes": nodes}
f"/managers/{quote(island_host)}/dataisland", {"nodes": nodes}
)

def dims(self):
return self._get_json("/islands")

def add_dim(self, dim):
self._POST(f"/islands/{dim}", content=None)

def remove_dim(self, dim):
self._DELETE(f"/islands/{dim}")

def add_node_to_dim(self, dim, nm):
"""
Adds a nm to a dim
"""
self._POST(
f"managers/{dim}/nodes/{nm}", content=None, )

def remove_node_from_dim(self, dim, nm):
"""
Removes a nm from a dim
"""
self._DELETE(f"managers/{dim}/nodes/{nm}")
44 changes: 20 additions & 24 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ class allows for multiple levels of hierarchy seamlessly.
__metaclass__ = abc.ABCMeta

def __init__(
self,
dmPort,
partitionAttr,
subDmId,
dmHosts=[],
pkeyPath=None,
dmCheckTimeout=10,
self,
dmPort,
partitionAttr,
subDmId,
dmHosts=[],
pkeyPath=None,
dmCheckTimeout=10,
):
"""
Creates a new CompositeManager. The sub-DMs it manages are to be located
Expand Down Expand Up @@ -211,6 +211,10 @@ def dmHosts(self):
def addDmHost(self, host):
self._dmHosts.append(host)

def removeDmHost(self, host):
if host in self._dmHosts:
self._dmHosts.remove(host)

@property
def nodes(self):
return self._nodes[:]
Expand All @@ -235,7 +239,7 @@ def dmAt(self, host, port=None):

if not self.check_dm(host, port):
raise SubManagerException(
"Manager expected but not running in %s:%d" % (host, port)
f"Manager expected but not running in {host}:{port}"
)

port = port or self._dmPort
Expand Down Expand Up @@ -284,10 +288,7 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None
iterable,
)
if thrExs:
msg = "More than one error occurred while %s on session %s" % (
action,
sessionId,
)
msg = f"More than one error occurred while {action} on session {sessionId}"
raise SubManagerException(msg, thrExs)

#
Expand Down Expand Up @@ -347,7 +348,7 @@ def addGraphSpec(self, sessionId, graphSpec):
# belong to the same host, so we can submit that graph into the individual
# DMs. For this we need to make sure that our graph has a the correct
# attribute set
logger.info(f"Separating graph using partition attribute {self._partitionAttr}")
logger.info("Separating graph using partition attribute %s", self._partitionAttr)
perPartition = collections.defaultdict(list)
if "rmode" in graphSpec[-1]:
init_pg_repro_data(graphSpec)
Expand All @@ -360,19 +361,14 @@ def addGraphSpec(self, sessionId, graphSpec):
graphSpec.pop()
for dropSpec in graphSpec:
if self._partitionAttr not in dropSpec:
msg = "Drop %s doesn't specify a %s attribute" % (
dropSpec["oid"],
self._partitionAttr,
)
msg = f"Drop {dropSpec.get('oid', None)} doesn't specify a {self._partitionAttr} " \
f"attribute"
raise InvalidGraphException(msg)

partition = dropSpec[self._partitionAttr]
if partition not in self._dmHosts:
msg = "Drop %s's %s %s does not belong to this DM" % (
dropSpec["oid"],
self._partitionAttr,
partition,
)
msg = f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} " \
f"does not belong to this DM"
raise InvalidGraphException(msg)

perPartition[partition].append(dropSpec)
Expand All @@ -382,7 +378,7 @@ def addGraphSpec(self, sessionId, graphSpec):
# At each partition the relationships between DROPs should be local at the
# moment of submitting the graph; thus we record the inter-partition
# relationships separately and remove them from the original graph spec
logger.info(f"Graph splitted into {perPartition.keys()}")
logger.info("Graph split into %r", perPartition.keys())
inter_partition_rels = []
for dropSpecs in perPartition.values():
inter_partition_rels += graph_loader.removeUnmetRelationships(dropSpecs)
Expand Down Expand Up @@ -448,7 +444,7 @@ def deploySession(self, sessionId, completedDrops=[]):
)
logger.info("Delivered node subscription list to node managers")
logger.debug(
"Number of subscriptions: %s" % len(self._drop_rels[sessionId].items())
"Number of subscriptions: %s", len(self._drop_rels[sessionId].items())
)

logger.info("Deploying Session %s in all hosts", sessionId)
Expand Down
Loading