Skip to content

Commit

Permalink
Introduces node-manager-assigner class for Mastermanager to use assig…
Browse files Browse the repository at this point in the history
…ning node-managers to dims.

Replicates current logic, without performing actual assignments.
  • Loading branch information
pritchardn committed May 19, 2022
1 parent 8a3ffdc commit 1d24edc
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 17 deletions.
25 changes: 8 additions & 17 deletions daliuge-engine/dlg/manager/proc_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from . import constants, client
from .. import utils
from ..restserver import RestServer
from dlg.nm_dim_assigner import NMAssigner

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -236,23 +237,20 @@ def startMM(self):
# Also subscribe to zeroconf events coming from NodeManagers and feed
# the Master Manager with the new hosts we find
if self._zeroconf:
mm_client = client.MasterManagerClient()
node_managers = {}
dims = {}
nm_assigner = NMAssigner()

def nm_callback(zeroconf, service_type, name, state_change):
info = zeroconf.get_service_info(service_type, name)
if state_change is zc.ServiceStateChange.Added:
server = socket.inet_ntoa(_get_address(info))
port = info.port
node_managers[name] = (server, port)
nm_assigner.add_nm(name, server, port)
logger.info(
"Found a new Node Manager on %s:%d, will add it to the MM"
% (server, port)
)
mm_client.add_node(server)
elif state_change is zc.ServiceStateChange.Removed:
server, port = node_managers[name]
server, port = nm_assigner.NMs[name]
logger.info(
"Node Manager on %s:%d disappeared, removing it from the MM"
% (server, port)
Expand All @@ -262,24 +260,20 @@ def nm_callback(zeroconf, service_type, name, state_change):
# we avoid hanging in here if the MM is down already but
# we are trying to remove our NM who has just disappeared
if not self._shutting_down:
try:
mm_client.remove_node(server)
finally:
del node_managers[name]
nm_assigner.remove_nm(name)

def dim_callback(zeroconf, service_type, name, state_change):
info = zeroconf.get_service_info(service_type, name)
if state_change is zc.ServiceStateChange.Added:
server = socket.inet_ntoa(_get_address(info))
port = info.port
dims[name] = (server, port)
nm_assigner.add_dim(name, server, port)
logger.info(
"Found a new Data Island Manager on %s:%d, will add it to the MM"
% (server, port)
)
mm_client.add_dim(server)
elif state_change is zc.ServiceStateChange.Removed:
server, port = dims[name]
server, port = nm_assigner.DIMs[name]
logger.info(
"Data Island Manager on %s:%d disappeared, removing it from the MM"
% (server, port)
Expand All @@ -289,10 +283,7 @@ def dim_callback(zeroconf, service_type, name, state_change):
# we avoid hanging in here if the MM is down already but
# we are trying to remove our NM who has just disappeared
if not self._shutting_down:
try:
mm_client.remove_dim(server)
finally:
del dims[name]
nm_assigner.remove_dim(name)

self._mm_browser = utils.browse_service(
self._zeroconf, "NodeManager", "tcp", nm_callback
Expand Down
59 changes: 59 additions & 0 deletions daliuge-engine/dlg/nm_dim_assigner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
from dlg.manager import client

logger = logging.getLogger(__name__)


class NMAssigner:

def __init__(self):
self.DIMs = {} # Set of DIMs name -> (server, port)
self.NMs = {} # Set of NMs name -> (server, port)
self.assignments = {} # Maps NMs to DIMs
self.mm_client = client.MasterManagerClient()

def add_dim(self, name, server, port):
self.DIMs[name] = (server, port)
self.mm_client.add_dim(server)
self.allocate_nms()

def add_nm(self, name, server, port):
self.NMs[name] = (server, port)
self.mm_client.add_node(server)
self.allocate_nms()

def remove_dim(self, name):
server, port = self.DIMs[name]
try:
self.mm_client.remove_dim(server)
# TODO: Handle removing assignments
finally:
del self.DIMs[name]
return server, port

def remove_nm(self, name):
server, _ = self.NMs[name]
try:
self.mm_client.remove_node(server)
# TODO: Handle removing assignments
finally:
del self.NMs[name]

def allocate_nms(self):
if self.DIMs == {}:
for nm in self.assignments:
logger.info("Letting NM %s know they have no DIM", nm)
pass # TODO: let NMs know they have no DIM
elif len(self.DIMs.keys()) == 1:
dim = list(self.DIMs.keys())[0]
for nm in self.NMs.keys():
if nm not in self.assignments:
logger.info("Adding NM %s to DIM %s", nm, dim)
# TODO: Actually add the mn
pass
elif self.assignments[nm] not in self.DIMs: # If we've removed a DIM
logger.info("Re-assigning %s to DIM %s", nm, dim)
# TODO: Actually re-assign the dim
else: # We have lots of DIMs
# Will do nothing, it's up to the user/deployer to handle this.
pass

0 comments on commit 1d24edc

Please sign in to comment.