Skip to content

Commit

Permalink
Merge 14931a9 into a47cb17
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed May 25, 2022
2 parents a47cb17 + 14931a9 commit d07bfe8
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 59 deletions.
43 changes: 38 additions & 5 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from . import constants
from .restutils import RestClient


logger = logging.getLogger(__name__)
compress = os.environ.get("DALIUGE_COMPRESSED_JSON", True)

Expand Down Expand Up @@ -137,7 +136,7 @@ def sessions(self):
"Successfully read %d sessions from %s:%s",
len(sessions),
self.host,
self.port,
self.port
)
return sessions

Expand Down Expand Up @@ -223,7 +222,7 @@ class NodeManagerClient(BaseDROPManagerClient):
"""

def __init__(
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
):
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

Expand Down Expand Up @@ -259,7 +258,7 @@ class DataIslandManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
):
super(DataIslandManagerClient, self).__init__(
host=host, port=port, timeout=timeout
Expand All @@ -272,11 +271,45 @@ class MasterManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
):
super(MasterManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def create_island(self, island_host, nodes):
self._post_json(
"/managers/%s/dataisland" % (quote(island_host)), {"nodes": nodes}
)

def dims(self):
return self._get_json("/islands")

def add_dim(self, dim):
self._POST(f"/islands/{dim}", content=None)

def remove_dim(self, dim):
self._DELETE("/islands/%s" % (dim,))

def add_node_to_dim(self, dim, nm):
"""
Adds a nm to a dim
"""
self._POST(
"managers/%s/nodes/%s"
% (
dim,
nm,
),
content=None,
)

def remove_node_from_dim(self, dim, nm):
"""
Removes a nm from a dim
"""
self._DELETE(
"managers/%s/nodes/%s"
% (
dim,
nm,
)
)
10 changes: 7 additions & 3 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ def dmHosts(self):
def addDmHost(self, host):
self._dmHosts.append(host)

def removeDmHost(self, host):
if host in self._dmHosts:
self._dmHosts.remove(host)

@property
def nodes(self):
return self._nodes[:]
Expand Down Expand Up @@ -347,7 +351,7 @@ def addGraphSpec(self, sessionId, graphSpec):
# belong to the same host, so we can submit that graph into the individual
# DMs. For this we need to make sure that our graph has a the correct
# attribute set
logger.info(f"Separating graph using partition attribute {self._partitionAttr}")
logger.info("Separating graph using partition attribute %s", self._partitionAttr)
perPartition = collections.defaultdict(list)
if "rmode" in graphSpec[-1]:
init_pg_repro_data(graphSpec)
Expand Down Expand Up @@ -382,7 +386,7 @@ def addGraphSpec(self, sessionId, graphSpec):
# At each partition the relationships between DROPs should be local at the
# moment of submitting the graph; thus we record the inter-partition
# relationships separately and remove them from the original graph spec
logger.info(f"Graph splitted into {perPartition.keys()}")
logger.info("Graph split into %r", perPartition.keys())
inter_partition_rels = []
for dropSpecs in perPartition.values():
inter_partition_rels += graph_loader.removeUnmetRelationships(dropSpecs)
Expand Down Expand Up @@ -448,7 +452,7 @@ def deploySession(self, sessionId, completedDrops=[]):
)
logger.info("Delivered node subscription list to node managers")
logger.debug(
"Number of subscriptions: %s" % len(self._drop_rels[sessionId].items())
"Number of subscriptions: %s", len(self._drop_rels[sessionId].items())
)

logger.info("Deploying Session %s in all hosts", sessionId)
Expand Down
89 changes: 65 additions & 24 deletions daliuge-engine/dlg/manager/proc_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from . import constants, client
from .. import utils
from ..restserver import RestServer
from dlg.nm_dim_assigner import NMAssigner

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,7 +75,6 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0

self._shutting_down = False
self._verbosity = verbosity

# The three processes we run
self._nm_proc = None
self._dim_proc = None
Expand All @@ -84,6 +84,7 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0
self._zeroconf = None if disable_zeroconf else zc.Zeroconf()
self._nm_info = None
self._mm_browser = None
self._mm_dim_browser = None

# Starting managers
app = self.app
Expand Down Expand Up @@ -128,6 +129,9 @@ def _stop_zeroconf(self):
if self._mm_browser:
self._mm_browser.cancel()
self._mm_browser.join()
if self._mm_dim_browser:
self._mm_dim_browser.cancel()
self._mm_dim_browser.join()
self._zeroconf.close()
logger.info("Zeroconf stopped")

Expand Down Expand Up @@ -176,15 +180,15 @@ def startNM(self):
tool = get_tool()
args = ["--host", "0.0.0.0"]
args += self._verbosity_as_cmdline()
logger.info("Starting Node Drop Manager with args: %s" % (" ".join(args)))
logger.info("Starting Node Drop Manager with args: %s", (" ".join(args)))
self._nm_proc = tool.start_process("nm", args)
logger.info("Started Node Drop Manager with PID %d" % (self._nm_proc.pid))
logger.info("Started Node Drop Manager with PID %d", self._nm_proc.pid)

# Registering the new NodeManager via zeroconf so it gets discovered
# by the Master Manager
if self._zeroconf:
addrs = utils.get_local_ip_addr()
logger.info("Registering this NM with zeroconf: %s" % addrs)
logger.info("Registering this NM with zeroconf: %s", addrs)
self._nm_info = utils.register_service(
self._zeroconf,
"NodeManager",
Expand All @@ -201,58 +205,95 @@ def startDIM(self, nodes):
if nodes:
args += ["--nodes", ",".join(nodes)]
logger.info(
"Starting Data Island Drop Manager with args: %s" % (" ".join(args))
"Starting Data Island Drop Manager with args: %s", (" ".join(args))
)
self._dim_proc = tool.start_process("dim", args)
logger.info(
"Started Data Island Drop Manager with PID %d" % (self._dim_proc.pid)
"Started Data Island Drop Manager with PID %d", self._dim_proc.pid
)

# Registering the new DIM via zeroconf so it gets discovered
# by the Master Manager
if self._zeroconf:
addrs = utils.get_local_ip_addr()
logger.info("Registering this DIM with zeroconf: %s", addrs)
self._nm_info = utils.register_service(
self._zeroconf,
"DIM",
socket.gethostname(),
addrs[0][0],
constants.ISLAND_DEFAULT_REST_PORT,
)
return

def startMM(self):
tool = get_tool()
args = ["--host", "0.0.0.0"]
args += self._verbosity_as_cmdline()
logger.info("Starting Master Drop Manager with args: %s" % (" ".join(args)))
logger.info("Starting Master Drop Manager with args: %s", (" ".join(args)))
self._mm_proc = tool.start_process("mm", args)
logger.info("Started Master Drop Manager with PID %d" % (self._mm_proc.pid))
logger.info("Started Master Drop Manager with PID %d", self._mm_proc.pid)

# Also subscribe to zeroconf events coming from NodeManagers and feed
# the Master Manager with the new hosts we find
if self._zeroconf:
mm_client = client.MasterManagerClient()
node_managers = {}
nm_assigner = NMAssigner()

def nm_callback(zeroconf, service_type, name, state_change):
info = zeroconf.get_service_info(service_type, name)
if state_change is zc.ServiceStateChange.Added:
server = socket.inet_ntoa(_get_address(info))
port = info.port
node_managers[name] = (server, port)
nm_assigner.add_nm(name, server, port)
logger.info(
"Found a new Node Manager on %s:%d, will add it to the MM"
% (server, port)
"Found a new Node Manager on %s:%d, will add it to the MM",
(server, port)
)
mm_client.add_node(server)
elif state_change is zc.ServiceStateChange.Removed:
server, port = node_managers[name]
server, port = nm_assigner.NMs[name]
logger.info(
"Node Manager on %s:%d disappeared, removing it from the MM"
% (server, port)
"Node Manager on %s:%d disappeared, removing it from the MM",
(server, port)
)

# Don't bother to remove it if we're shutting down. This way
# we avoid hanging in here if the MM is down already but
# we are trying to remove our NM who has just disappeared
if not self._shutting_down:
try:
mm_client.remove_node(server)
finally:
del node_managers[name]
nm_assigner.remove_nm(name)

def dim_callback(zeroconf, service_type, name, state_change):
info = zeroconf.get_service_info(service_type, name)
if state_change is zc.ServiceStateChange.Added:
server = socket.inet_ntoa(_get_address(info))
port = info.port
nm_assigner.add_dim(name, server, port)
logger.info(
"Found a new Data Island Manager on %s:%d, will add it to the MM",
(server, port)
)
elif state_change is zc.ServiceStateChange.Removed:
server, port = nm_assigner.DIMs[name]
logger.info(
"Data Island Manager on %s:%d disappeared, removing it from the MM",
(server, port)
)

# Don't bother to remove it if we're shutting down. This way
# we avoid hanging in here if the MM is down already but
# we are trying to remove our NM who has just disappeared
if not self._shutting_down:
nm_assigner.remove_dim(name)

self._mm_browser = utils.browse_service(
self._zeroconf, "NodeManager", "tcp", nm_callback
)
self._mm_dim_browser = utils.browse_service(
self._zeroconf,
"DIM",
"tcp",
dim_callback, # DIM since name must be < 15 bytes
)
logger.info("Zeroconf started")
return

Expand All @@ -278,7 +319,7 @@ def _rest_stop_manager(self, proc, stop_method):
def _rest_get_manager_info(self, proc):
if proc:
bottle.response.content_type = "application/json"
logger.info("Sending response: %s" % json.dumps({"pid": proc.pid}))
logger.info("Sending response: %s", json.dumps({"pid": proc.pid}))
return json.dumps({"pid": proc.pid})
else:
return json.dumps({"pid": None})
Expand All @@ -296,7 +337,7 @@ def rest_getMgrs(self):
if mgrs["node"]:
mgrs["node"] = self._nm_proc.pid

logger.info("Sending response: %s" % json.dumps(mgrs))
logger.info("Sending response: %s", json.dumps(mgrs))
return json.dumps(mgrs)

def rest_startNM(self):
Expand Down Expand Up @@ -396,7 +437,7 @@ def handle_signal(signalNo, stack_frame):
global terminating
if terminating:
return
logger.info("Received signal %d, will stop the daemon now" % (signalNo,))
logger.info("Received signal %d, will stop the daemon now", signalNo)
terminating = True
daemon.stop(10)

Expand Down
Loading

0 comments on commit d07bfe8

Please sign in to comment.