diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index bfef4ef66..ed9f2bb82 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -26,7 +26,6 @@ from . import constants from .restutils import RestClient - logger = logging.getLogger(__name__) compress = os.environ.get("DALIUGE_COMPRESSED_JSON", True) @@ -49,7 +48,7 @@ def stop(self): self._POST("/stop") def cancelSession(self, sessionId): - self._POST("/sessions/%s/cancel" % quote(sessionId)) + self._POST(f"/sessions/{quote(sessionId)}/cancel") def create_session(self, sessionId): """ @@ -68,7 +67,7 @@ def deploy_session(self, sessionId, completed_uids=[]): content = None if completed_uids: content = {"completed": ",".join(completed_uids)} - self._post_form("/sessions/%s/deploy" % (quote(sessionId),), content) + self._post_form(f"/sessions/{quote(sessionId)}/deploy", content) logger.debug( "Successfully deployed session %s on %s:%s", sessionId, self.host, self.port ) @@ -79,7 +78,7 @@ def append_graph(self, sessionId, graphSpec): but checking that the graph looks correct """ self._post_json( - "/sessions/%s/graph/append" % (quote(sessionId),), + f"/sessions/{quote(sessionId)}/graph/append", graphSpec, compress=compress, ) @@ -94,7 +93,7 @@ def destroy_session(self, sessionId): """ Destroys session `sessionId` """ - self._DELETE("/sessions/%s" % (quote(sessionId),)) + self._DELETE(f"/sessions/{quote(sessionId)}") logger.debug( "Successfully deleted session %s on %s:%s", sessionId, self.host, self.port ) @@ -104,7 +103,7 @@ def graph_status(self, sessionId): Returns a dictionary where the keys are DROP UIDs and the values are their corresponding status. """ - ret = self._get_json("/sessions/%s/graph/status" % (quote(sessionId),)) + ret = self._get_json(f"/sessions/{quote(sessionId)}/graph/status") logger.debug( "Successfully read graph status from session %s on %s:%s", sessionId, @@ -118,7 +117,7 @@ def graph(self, sessionId): Returns a dictionary where the key are the DROP UIDs, and the values are the DROP specifications. """ - graph = self._get_json("/sessions/%s/graph" % (quote(sessionId),)) + graph = self._get_json(f"/sessions/{quote(sessionId)}/graph") logger.debug( "Successfully read graph (%d nodes) from session %s on %s:%s", len(graph), @@ -137,7 +136,7 @@ def sessions(self): "Successfully read %d sessions from %s:%s", len(sessions), self.host, - self.port, + self.port ) return sessions @@ -145,7 +144,7 @@ def session(self, sessionId): """ Returns the details of sessions `sessionId` """ - session = self._get_json("/sessions/%s" % (quote(sessionId),)) + session = self._get_json(f"/sessions/{quote(sessionId)}") logger.debug( "Successfully read session %s from %s:%s", sessionId, self.host, self.port ) @@ -155,7 +154,7 @@ def session_status(self, sessionId): """ Returns the status of session `sessionId` """ - status = self._get_json("/sessions/%s/status" % (quote(sessionId),)) + status = self._get_json(f"/sessions/{quote(sessionId)}/status") logger.debug( "Successfully read session %s status (%s) from %s:%s", sessionId, @@ -169,7 +168,7 @@ def session_repro_status(self, sessionId): """ Returns the reproducibility status of session `sessionId`. """ - status = self._get_json("/sessions/%s/repro/status" % (quote(sessionId),)) + status = self._get_json(f"/sessions/{quote(sessionId)}/repro/status") logger.debug( "Successfully read session %s reproducibility status (%s) from %s:%s", sessionId, @@ -183,7 +182,7 @@ def session_repro_data(self, sessionId): """ Returns the graph-wide reproducibility information of session `sessionId`. """ - data = self._get_json("/sessions/%s/repro/data" % (quote(sessionId),)) + data = self._get_json(f"/sessions/{quote(sessionId)}/repro/data") logger.debug( "Successfully read session %s reproducibility data from %s:%s", sessionId, @@ -196,7 +195,7 @@ def graph_size(self, sessionId): """ Returns the size of the graph of session `sessionId` """ - count = self._get_json("/sessions/%s/graph/size" % (quote(sessionId))) + count = self._get_json(f"/sessions/{quote(sessionId)}/graph/size") logger.debug( "Successfully read session %s graph size (%d) from %s:%s", sessionId, @@ -223,17 +222,17 @@ class NodeManagerClient(BaseDROPManagerClient): """ def __init__( - self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10 + self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10 ): super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout) def add_node_subscriptions(self, sessionId, node_subscriptions): self._post_json( - "/sessions/%s/subscriptions" % (quote(sessionId),), node_subscriptions + f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions ) def trigger_drops(self, sessionId, drop_uids): - self._post_json("/sessions/%s/trigger" % (quote(sessionId),), drop_uids) + self._post_json(f"/sessions/{quote(sessionId)}/trigger", drop_uids) def shutdown_node_manager(self): self._GET("/shutdown") @@ -247,10 +246,10 @@ def nodes(self): return self._get_json("/nodes") def add_node(self, node): - self._POST("/nodes/%s" % (node,), content=None) + self._POST(f"/nodes/{node}", content=None) def remove_node(self, node): - self._DELETE("/nodes/%s" % (node,)) + self._DELETE(f"/nodes/{node}") class DataIslandManagerClient(CompositeManagerClient): @@ -259,7 +258,7 @@ class DataIslandManagerClient(CompositeManagerClient): """ def __init__( - self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10 + self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10 ): super(DataIslandManagerClient, self).__init__( host=host, port=port, timeout=timeout @@ -272,11 +271,33 @@ class MasterManagerClient(CompositeManagerClient): """ def __init__( - self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10 + self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10 ): super(MasterManagerClient, self).__init__(host=host, port=port, timeout=timeout) def create_island(self, island_host, nodes): self._post_json( - "/managers/%s/dataisland" % (quote(island_host)), {"nodes": nodes} + f"/managers/{quote(island_host)}/dataisland", {"nodes": nodes} ) + + def dims(self): + return self._get_json("/islands") + + def add_dim(self, dim): + self._POST(f"/islands/{dim}", content=None) + + def remove_dim(self, dim): + self._DELETE(f"/islands/{dim}") + + def add_node_to_dim(self, dim, nm): + """ + Adds a nm to a dim + """ + self._POST( + f"managers/{dim}/nodes/{nm}", content=None, ) + + def remove_node_from_dim(self, dim, nm): + """ + Removes a nm from a dim + """ + self._DELETE(f"managers/{dim}/nodes/{nm}") diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 8e34c9d6b..198f2e6a6 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -127,13 +127,13 @@ class allows for multiple levels of hierarchy seamlessly. __metaclass__ = abc.ABCMeta def __init__( - self, - dmPort, - partitionAttr, - subDmId, - dmHosts=[], - pkeyPath=None, - dmCheckTimeout=10, + self, + dmPort, + partitionAttr, + subDmId, + dmHosts=[], + pkeyPath=None, + dmCheckTimeout=10, ): """ Creates a new CompositeManager. The sub-DMs it manages are to be located @@ -211,6 +211,10 @@ def dmHosts(self): def addDmHost(self, host): self._dmHosts.append(host) + def removeDmHost(self, host): + if host in self._dmHosts: + self._dmHosts.remove(host) + @property def nodes(self): return self._nodes[:] @@ -235,7 +239,7 @@ def dmAt(self, host, port=None): if not self.check_dm(host, port): raise SubManagerException( - "Manager expected but not running in %s:%d" % (host, port) + f"Manager expected but not running in {host}:{port}" ) port = port or self._dmPort @@ -284,10 +288,7 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None iterable, ) if thrExs: - msg = "More than one error occurred while %s on session %s" % ( - action, - sessionId, - ) + msg = f"More than one error occurred while {action} on session {sessionId}" raise SubManagerException(msg, thrExs) # @@ -347,7 +348,7 @@ def addGraphSpec(self, sessionId, graphSpec): # belong to the same host, so we can submit that graph into the individual # DMs. For this we need to make sure that our graph has a the correct # attribute set - logger.info(f"Separating graph using partition attribute {self._partitionAttr}") + logger.info("Separating graph using partition attribute %s", self._partitionAttr) perPartition = collections.defaultdict(list) if "rmode" in graphSpec[-1]: init_pg_repro_data(graphSpec) @@ -360,19 +361,14 @@ def addGraphSpec(self, sessionId, graphSpec): graphSpec.pop() for dropSpec in graphSpec: if self._partitionAttr not in dropSpec: - msg = "Drop %s doesn't specify a %s attribute" % ( - dropSpec["oid"], - self._partitionAttr, - ) + msg = f"Drop {dropSpec.get('oid', None)} doesn't specify a {self._partitionAttr} " \ + f"attribute" raise InvalidGraphException(msg) partition = dropSpec[self._partitionAttr] if partition not in self._dmHosts: - msg = "Drop %s's %s %s does not belong to this DM" % ( - dropSpec["oid"], - self._partitionAttr, - partition, - ) + msg = f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} " \ + f"does not belong to this DM" raise InvalidGraphException(msg) perPartition[partition].append(dropSpec) @@ -382,7 +378,7 @@ def addGraphSpec(self, sessionId, graphSpec): # At each partition the relationships between DROPs should be local at the # moment of submitting the graph; thus we record the inter-partition # relationships separately and remove them from the original graph spec - logger.info(f"Graph splitted into {perPartition.keys()}") + logger.info("Graph split into %r", perPartition.keys()) inter_partition_rels = [] for dropSpecs in perPartition.values(): inter_partition_rels += graph_loader.removeUnmetRelationships(dropSpecs) @@ -448,7 +444,7 @@ def deploySession(self, sessionId, completedDrops=[]): ) logger.info("Delivered node subscription list to node managers") logger.debug( - "Number of subscriptions: %s" % len(self._drop_rels[sessionId].items()) + "Number of subscriptions: %s", len(self._drop_rels[sessionId].items()) ) logger.info("Deploying Session %s in all hosts", sessionId) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index 5244a30a2..6f49e4f90 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -37,6 +37,7 @@ from . import constants, client from .. import utils from ..restserver import RestServer +from dlg.nm_dim_assigner import NMAssigner logger = logging.getLogger(__name__) @@ -74,7 +75,6 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0 self._shutting_down = False self._verbosity = verbosity - # The three processes we run self._nm_proc = None self._dim_proc = None @@ -83,7 +83,9 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0 # Zeroconf for NM and MM self._zeroconf = None if disable_zeroconf else zc.Zeroconf() self._nm_info = None - self._mm_browser = None + self._dim_info = None + self._mm_nm_browser = None + self._mm_dim_browser = None # Starting managers app = self.app @@ -125,9 +127,12 @@ def _stop_zeroconf(self): return # Stop the MM service browser, the NM registration, and ZC itself - if self._mm_browser: - self._mm_browser.cancel() - self._mm_browser.join() + if self._mm_nm_browser: + self._mm_nm_browser.cancel() + self._mm_nm_browser.join() + if self._mm_dim_browser: + self._mm_dim_browser.cancel() + self._mm_dim_browser.join() self._zeroconf.close() logger.info("Zeroconf stopped") @@ -166,6 +171,8 @@ def stopNM(self, timeout=10): return self._stop_manager("_nm_proc", timeout) def stopDIM(self, timeout=10): + if self._dim_info: + utils.deregister_service(self._zeroconf, self._dim_info) self._stop_manager("_dim_proc", timeout) def stopMM(self, timeout=10): @@ -176,15 +183,15 @@ def startNM(self): tool = get_tool() args = ["--host", "0.0.0.0"] args += self._verbosity_as_cmdline() - logger.info("Starting Node Drop Manager with args: %s" % (" ".join(args))) + logger.info("Starting Node Drop Manager with args: %s", (" ".join(args))) self._nm_proc = tool.start_process("nm", args) - logger.info("Started Node Drop Manager with PID %d" % (self._nm_proc.pid)) + logger.info("Started Node Drop Manager with PID %d", self._nm_proc.pid) # Registering the new NodeManager via zeroconf so it gets discovered # by the Master Manager if self._zeroconf: addrs = utils.get_local_ip_addr() - logger.info("Registering this NM with zeroconf: %s" % addrs) + logger.info("Registering this NM with zeroconf: %s", addrs) self._nm_info = utils.register_service( self._zeroconf, "NodeManager", @@ -201,58 +208,80 @@ def startDIM(self, nodes): if nodes: args += ["--nodes", ",".join(nodes)] logger.info( - "Starting Data Island Drop Manager with args: %s" % (" ".join(args)) + "Starting Data Island Drop Manager with args: %s", (" ".join(args)) ) self._dim_proc = tool.start_process("dim", args) logger.info( - "Started Data Island Drop Manager with PID %d" % (self._dim_proc.pid) + "Started Data Island Drop Manager with PID %d", self._dim_proc.pid ) + + # Registering the new DIM via zeroconf so it gets discovered + # by the Master Manager + if self._zeroconf: + addrs = utils.get_local_ip_addr() + logger.info("Registering this DIM with zeroconf: %s", addrs) + self._dim_info = utils.register_service( + self._zeroconf, + "DIM", + socket.gethostname(), + addrs[0][0], + constants.ISLAND_DEFAULT_REST_PORT, + ) return def startMM(self): tool = get_tool() args = ["--host", "0.0.0.0"] args += self._verbosity_as_cmdline() - logger.info("Starting Master Drop Manager with args: %s" % (" ".join(args))) + logger.info("Starting Master Drop Manager with args: %s", (" ".join(args))) self._mm_proc = tool.start_process("mm", args) - logger.info("Started Master Drop Manager with PID %d" % (self._mm_proc.pid)) + logger.info("Started Master Drop Manager with PID %d", self._mm_proc.pid) # Also subscribe to zeroconf events coming from NodeManagers and feed # the Master Manager with the new hosts we find if self._zeroconf: - mm_client = client.MasterManagerClient() - node_managers = {} + nm_assigner = NMAssigner() - def nm_callback(zeroconf, service_type, name, state_change): + def _callback(zeroconf, service_type, name, state_change, adder, remover, accessor): info = zeroconf.get_service_info(service_type, name) if state_change is zc.ServiceStateChange.Added: server = socket.inet_ntoa(_get_address(info)) port = info.port - node_managers[name] = (server, port) + adder(name, server, port) logger.info( - "Found a new Node Manager on %s:%d, will add it to the MM" - % (server, port) + "Found a new %s on %s:%d, will add it to the MM", + service_type, server, port ) - mm_client.add_node(server) elif state_change is zc.ServiceStateChange.Removed: - server, port = node_managers[name] + server, port = accessor(name) logger.info( - "Node Manager on %s:%d disappeared, removing it from the MM" - % (server, port) + "%s on %s:%d disappeared, removing it from the MM", + service_type, server, port ) # Don't bother to remove it if we're shutting down. This way # we avoid hanging in here if the MM is down already but # we are trying to remove our NM who has just disappeared if not self._shutting_down: - try: - mm_client.remove_node(server) - finally: - del node_managers[name] + remover(name) + + nm_callback = functools.partial(_callback, adder=nm_assigner.add_nm, + remover=nm_assigner.remove_nm, + accessor=nm_assigner.get_nm) - self._mm_browser = utils.browse_service( + dim_callback = functools.partial(_callback, adder=nm_assigner.add_dim, + remover=nm_assigner.remove_dim, + accessor=nm_assigner.get_dim) + + self._mm_nm_browser = utils.browse_service( self._zeroconf, "NodeManager", "tcp", nm_callback ) + self._mm_dim_browser = utils.browse_service( + self._zeroconf, + "DIM", + "tcp", + dim_callback, # DIM since name must be < 15 bytes + ) logger.info("Zeroconf started") return @@ -278,7 +307,7 @@ def _rest_stop_manager(self, proc, stop_method): def _rest_get_manager_info(self, proc): if proc: bottle.response.content_type = "application/json" - logger.info("Sending response: %s" % json.dumps({"pid": proc.pid})) + logger.info("Sending response: %s", json.dumps({"pid": proc.pid})) return json.dumps({"pid": proc.pid}) else: return json.dumps({"pid": None}) @@ -296,7 +325,7 @@ def rest_getMgrs(self): if mgrs["node"]: mgrs["node"] = self._nm_proc.pid - logger.info("Sending response: %s" % json.dumps(mgrs)) + logger.info("Sending response: %s", json.dumps(mgrs)) return json.dumps(mgrs) def rest_startNM(self): @@ -396,7 +425,7 @@ def handle_signal(signalNo, stack_frame): global terminating if terminating: return - logger.info("Received signal %d, will stop the daemon now" % (signalNo,)) + logger.info("Received signal %d, will stop the daemon now", signalNo) terminating = True daemon.stop(10) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 5d1af58bf..2e8454f96 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -191,8 +191,8 @@ def _stop_manager(self): self.dm.shutdown() self.stop() logger.info( - "Thanks for using our %s, come back again :-)" - % (self.dm.__class__.__name__) + "Thanks for using our %s, come back again :-)", + self.dm.__class__.__name__ ) @daliuge_aware @@ -400,7 +400,7 @@ def linkGraphParts(self, sessionId): @daliuge_aware def add_node_subscriptions(self, sessionId): - logger.debug(f"NM REST call: add_subscriptions {bottle.request.json}") + logger.debug("NM REST call: add_subscriptions %s", bottle.request.json) if bottle.request.content_type != "application/json": bottle.response.status = 415 return @@ -471,18 +471,18 @@ def getAllCMNodes(self): @daliuge_aware def addCMNode(self, node): - logger.debug("Adding node %s" % node) + logger.debug("Adding node %s", node) self.dm.add_node(node) @daliuge_aware def removeCMNode(self, node): - logger.debug("Removing node %s" % node) + logger.debug("Removing node %s", node) self.dm.remove_node(node) @daliuge_aware def getNodeSessions(self, node): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.sessions() @@ -527,28 +527,28 @@ def getLogFile(self, sessionId): @daliuge_aware def getNodeSessionInformation(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.session(sessionId) @daliuge_aware def getNodeSessionStatus(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.session_status(sessionId) @daliuge_aware def getNodeGraph(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.graph(sessionId) @daliuge_aware def getNodeGraphStatus(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.graph_status(sessionId) @@ -576,12 +576,15 @@ def visualizeDIM(self): class MasterManagerRestServer(CompositeManagerRestServer): def initializeSpecifics(self, app): CompositeManagerRestServer.initializeSpecifics(self, app) - + # DIM manamagement + app.post("/api/islands/", callback=self.addDIM) + app.delete("/api/islands/", callback=self.removeDIM) # Query forwarding to daemons - app.post("/api/managers//dataisland", callback=self.createDataIsland) app.post("/api/managers//node/start", callback=self.startNM) app.post("/api/managers//node/stop", callback=self.stopNM) + app.post("/api/managers//nodes/", callback=self.addNM) + app.delete("/api/managers//nodes/", callback=self.removeNM) # Querying about managers app.get("/api/islands", callback=self.getDIMs) app.get("/api/nodes", callback=self.getNMs) @@ -601,6 +604,16 @@ def createDataIsland(self, host): def getDIMs(self): return {"islands": self.dm.dmHosts} + @daliuge_aware + def addDIM(self, dim): + logger.debug("Adding DIM %s", dim) + self.dm.addDmHost(dim) + + @daliuge_aware + def removeDIM(self, dim): + logger.debug("Removing dim %s", dim) + self.dm.removeDmHost(dim) + @daliuge_aware def getNMs(self): return {"nodes": self.dm.nodes} @@ -608,21 +621,39 @@ def getNMs(self): @daliuge_aware def startNM(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending NM start request to %s:%s" % (host, port)) + logger.debug("Sending NM start request to %s:%s", host, port) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._POST("/managers/node/start").read()) @daliuge_aware def stopNM(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending NM stop request to %s:%s" % (host, port)) + logger.debug("Sending NM stop request to %s:%s", host, port) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._POST("/managers/node/stop").read()) + @daliuge_aware + def addNM(self, host, node): + port = constants.ISLAND_DEFAULT_REST_PORT + logger.debug("Adding NM %s to DIM %s", node, host) + with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: + return json.loads( + c._POST( + f"/nodes/{node}", + ).read() + ) + + @daliuge_aware + def removeNM(self, host, node): + port = constants.ISLAND_DEFAULT_REST_PORT + logger.debug("Removing NM %s from DIM %s", node, host) + with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: + return json.loads(c._DELETE(f"/nodes/{node}").read()) + @daliuge_aware def getNMInfo(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending request %s:%s/managers/node" % (host, port)) + logger.debug("Sending request %s:%s/managers/node", host, port) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._GET("/managers/node").read()) diff --git a/daliuge-engine/dlg/nm_dim_assigner.py b/daliuge-engine/dlg/nm_dim_assigner.py new file mode 100644 index 000000000..2ffa0403c --- /dev/null +++ b/daliuge-engine/dlg/nm_dim_assigner.py @@ -0,0 +1,77 @@ +import logging +from dlg.manager import client + +logger = logging.getLogger(__name__) + + +class NMAssigner: + """ + Handles the logic of assigning / removing NMs from DIMs && handling new DIMs coming on/off-line. + Maintains a leger of DIMs and NMs as name -> (server, port) mappings. + Maintains a mapping of NMs to DIMs + Allocation logic is currently very simple, handling only the case of a single DIM, but with + room for more advanced load-balancing, if we require in the future. + """ + def __init__(self): + self.DIMs = {} # Set of DIMs name -> (server, port) + self.NMs = {} # Set of NMs name -> (server, port) + self.assignments = {} # Maps NMs to DIMs + self.mm_client = client.MasterManagerClient() + + def add_dim(self, name, server, port): + self.DIMs[name] = (server, port) + self.mm_client.add_dim(server) + self.allocate_nms() + + def add_nm(self, name, server, port): + self.NMs[name] = (server, port) + self.mm_client.add_node(server) + self.allocate_nms() + + def get_dim(self, name): + return self.DIMs[name] + + def get_nm(self, name): + return self.NMs[name] + + def remove_dim(self, name): + server, port = self.DIMs[name] + try: + for nm in self.NMs: + if self.assignments[nm] == server: + del self.assignments[nm] + self.mm_client.remove_dim(server) + finally: + del self.DIMs[name] + return server, port + + def remove_nm(self, name): + server, _ = self.NMs[name] + try: + if name in self.assignments: + dim_ip = self.assignments[name] + nm_ip = self.NMs[name][0] + self.mm_client.remove_node_from_dim(dim_ip, nm_ip) + self.mm_client.remove_node(server) + finally: + del self.NMs[name] + + def allocate_nms(self): + if self.DIMs == {}: + logger.info("We have no DIMs") + elif len(self.DIMs.keys()) == 1: + dim_ip = list(self.DIMs.values())[0][0] # IP of the first (only) DIM + for nm, endpoint in self.NMs.items(): + nm_ip = endpoint[0] + if nm not in self.assignments: + logger.info("Adding NM %s to DIM %s", nm_ip, dim_ip) + self.mm_client.add_node_to_dim(dim_ip, nm_ip) + self.assignments[nm] = dim_ip + elif self.assignments[nm] not in self.DIMs: # If we've removed a DIM + logger.info("Re-assigning %s to DIM %s", nm_ip, dim_ip) + self.mm_client.add_node_to_dim(dim_ip, nm_ip) + self.assignments[nm] = dim_ip + else: # We have lots of DIMs + # Will do nothing, it's up to the user/deployer to handle this. + logger.info("Multiple DIMs, handle node assignments externally.") + pass diff --git a/daliuge-engine/test/manager/test_daemon.py b/daliuge-engine/test/manager/test_daemon.py index 59a090a8e..ebb3d3b2f 100644 --- a/daliuge-engine/test/manager/test_daemon.py +++ b/daliuge-engine/test/manager/test_daemon.py @@ -27,13 +27,50 @@ from dlg import utils, restutils from dlg.manager import constants -from dlg.manager.client import MasterManagerClient +from dlg.manager.client import MasterManagerClient, DataIslandManagerClient from dlg.manager.proc_daemon import DlgDaemon from six.moves import http_client as httplib # @UnresolvedImport _TIMEOUT = 10 +def _wait_until(update_condition, test_condition, timeout, interval=0.1, *args): + timeout_time = time.time() + timeout + output = None + while not test_condition(output) and time.time() < timeout_time: + output = update_condition(*args) + time.sleep(interval) + return output + + +def _get_dims_from_client(timeout, client): + def _update_nodes(*args): + return args[0].dims() + + def _test_dims(_dims): + return _dims + + dims = _wait_until(_update_nodes, _test_dims, timeout, 0.1, client) + return dims + + +def _get_nodes_from_client(timeout, client): + def _update_nodes(*args): + return args[0].nodes() + + def _test_nodes(_nodes): + if not _nodes or len(_nodes) == 0: + return False + return _nodes + + nodes = _wait_until(_update_nodes, _test_nodes, timeout, 0.1, client) + return nodes + + +def _update_nodes_with_timeout(*args): + return _get_nodes_from_client(_TIMEOUT, args[0]) + + class TestDaemon(unittest.TestCase): def create_daemon(self, *args, **kwargs): self._daemon_t = None @@ -120,7 +157,8 @@ def test_zeroconf_discovery(self): # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -128,6 +166,92 @@ def test_zeroconf_discovery(self): "MasterManager didn't find the NodeManager running on the same node", ) + def _test_zeroconf_dim_mm(self, disable_zeroconf=False): + # Start daemon with no master and no NM + self.create_daemon(master=False, noNM=True, disable_zeroconf=disable_zeroconf) + # Start DIM - now, on it's own + self._start("island", http.HTTPStatus.OK, {"nodes": []}) + # Start daemon with master but no NM + self._start("master", http.HTTPStatus.OK) + # Check that dim registers to MM + dims = None + mc = MasterManagerClient() + + def _update_dims(*args): + _dims = _get_dims_from_client(_TIMEOUT, args[0]) + return _dims + + def _test_dims(_dims): + if dims is not None and len(dims["islands"]) > 0: + return dims + else: + return False + + dims = _wait_until(_update_dims, _test_dims, _TIMEOUT, 0.1, mc) + self.assertIsNotNone(dims) + return dims + + def test_zeroconf_dim_mm(self): + dims = self._test_zeroconf_dim_mm(disable_zeroconf=False) + self.assertEqual( + 1, + len(dims["islands"]), + "MasterManager didn't find the DataIslandManager with zeroconf", + ) + + def test_without_zeroconf_dim_mm(self): + dims = self._test_zeroconf_dim_mm(disable_zeroconf=True) + self.assertEqual( + 0, + len(dims["islands"]), + "MasterManager found the DataIslandManager without zeroconf!?", + ) + + def _add_zeroconf_nm(self): + self._start("node", http.HTTPStatus.OK) + mc = MasterManagerClient() + + def _test_nodes(_nodes): + if _nodes is not None and len(_nodes) > 0: + return _nodes + else: + return False + + nodes = _wait_until(_update_nodes_with_timeout, _test_nodes, _TIMEOUT, 0.1, mc) + return nodes + + def test_zeroconf_dim_nm_setup(self): + """ + Sets up a mm with a node manager + Sets up a DIM with zeroconf discovery + Asserts that the mm attaches the nm to the discovered dim + """ + self._test_zeroconf_dim_mm(disable_zeroconf=False) + nodes = self._add_zeroconf_nm() + self.assertIsNotNone(nodes) + + def test_without_zeroconf_dim_nm_setup(self): + self._test_zeroconf_dim_mm(disable_zeroconf=True) + nodes = self._add_zeroconf_nm()['nodes'] + self.assertEqual(0, len(nodes)) + + def test_zeroconf_nm_down(self): + self._test_zeroconf_dim_mm(disable_zeroconf=False) + nodes = self._add_zeroconf_nm() + self.assertIsNotNone(nodes) + self._stop("node", http.HTTPStatus.OK) + mc = MasterManagerClient() + + def _test_nodes(_nodes): + if not _nodes: + return False + if not _nodes['nodes']: + return _nodes['nodes'] + return False + + new_nodes = _wait_until(_update_nodes_with_timeout, _test_nodes, _TIMEOUT, 0.1, mc)['nodes'] + self.assertEqual(0, len(new_nodes)) + def test_start_dataisland_via_rest(self): self.create_daemon(master=True, noNM=False, disable_zeroconf=False) @@ -135,7 +259,8 @@ def test_start_dataisland_via_rest(self): # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -154,13 +279,15 @@ def test_stop_dataisland_via_rest(self): # start master and island manager self.create_daemon(master=True, noNM=False, disable_zeroconf=False) - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self._start("island", http.HTTPStatus.OK, {"nodes": nodes}) # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -183,7 +310,8 @@ def test_stop_start_node_via_rest(self): # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -222,6 +350,18 @@ def test_start_stop_master_via_rest(self): "The MM did not stop successfully", ) + def test_get_dims(self): + self.create_daemon(master=True, noNM=True, disable_zeroconf=False) + # Check that the DataIsland starts with the given nodes + mc = MasterManagerClient() + dims = _get_dims_from_client(_TIMEOUT, mc) + self.assertIsNotNone(dims) + self.assertEqual( + 0, + len(dims["islands"]), + "MasterManager didn't find the DataIslandManager running on the same node", + ) + def _start(self, manager_name, expected_code, payload=None): conn = http.client.HTTPConnection("localhost", 9000) headers = {} @@ -230,7 +370,7 @@ def _start(self, manager_name, expected_code, payload=None): headers["Content-Type"] = "application/json" conn.request( "POST", - "/managers/%s/start" % (manager_name,), + f"/managers/{manager_name}/start", body=payload, headers=headers, ) @@ -246,18 +386,9 @@ def _stop(self, manager_name, expected_code, payload=None): payload = json.dumps(payload) headers["Content-Type"] = "application/json" conn.request( - "POST", "/managers/%s/stop" % (manager_name,), body=payload, headers=headers + "POST", f"/managers/{manager_name}/stop", body=payload, headers=headers ) response = conn.getresponse() self.assertEqual(expected_code, response.status, response.read()) response.close() conn.close() - - def _get_nodes_from_master(self, timeout): - mc = MasterManagerClient() - timeout_time = time.time() + timeout - while time.time() < timeout_time: - nodes = mc.nodes() - if nodes: - return nodes - time.sleep(0.1)