Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Liu 186 post changes #170

Merged
merged 2 commits into from
Jun 3, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 44 additions & 84 deletions daliuge-engine/test/manager/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,28 @@

from dlg import utils, restutils
from dlg.manager import constants
from dlg.manager.client import MasterManagerClient, DataIslandManagerClient
from dlg.manager.client import MasterManagerClient
from dlg.manager.proc_daemon import DlgDaemon
from six.moves import http_client as httplib # @UnresolvedImport

_TIMEOUT = 10
IDENTITY = lambda x: x


def _wait_until(update_condition, test_condition, timeout, interval=0.1, *args):
def wait_until(update_condition, test_condition=IDENTITY, timeout=_TIMEOUT, interval=0.1):
timeout_time = time.time() + timeout
output = None
while not test_condition(output) and time.time() < timeout_time:
output = update_condition(*args)
while time.time() < timeout_time:
output = update_condition()
if test_condition(output):
return output
time.sleep(interval)
return output


def _get_dims_from_client(timeout, client):
def _update_nodes(*args):
return args[0].dims()

def _test_dims(_dims):
return _dims

dims = _wait_until(_update_nodes, _test_dims, timeout, 0.1, client)
return dims

return None

def _get_nodes_from_client(timeout, client):
def _update_nodes(*args):
return args[0].nodes()

def _test_nodes(_nodes):
if not _nodes or len(_nodes) == 0:
return False
return _nodes
def _get_dims_from_client(client, **kwargs):
return wait_until(lambda: client.dims(), **kwargs)

nodes = _wait_until(_update_nodes, _test_nodes, timeout, 0.1, client)
return nodes


def _update_nodes_with_timeout(*args):
return _get_nodes_from_client(_TIMEOUT, args[0])
def _get_nodes_from_client(client, **kwargs):
return wait_until(lambda: client.nodes(), **kwargs)


class TestDaemon(unittest.TestCase):
Expand Down Expand Up @@ -141,12 +121,12 @@ def test_mm_starts(self):
def test_nothing_starts(self):
# Nothing should start now
self.create_daemon(master=False, noNM=True, disable_zeroconf=True)
self.assertFalse(
utils.portIsOpen("localhost", constants.NODE_DEFAULT_REST_PORT, 0),
self.assertTrue(
utils.portIsClosed("localhost", constants.NODE_DEFAULT_REST_PORT, 0),
"NM started but it should not have",
)
self.assertFalse(
utils.portIsOpen("localhost", constants.MASTER_DEFAULT_REST_PORT, 0),
self.assertTrue(
utils.portIsClosed("localhost", constants.MASTER_DEFAULT_REST_PORT, 0),
"NM started but it should not have",
)

Expand All @@ -158,7 +138,7 @@ def test_zeroconf_discovery(self):
# if we query the MM it should know about its nodes, which should have
# one element
mc = MasterManagerClient()
nodes = _get_nodes_from_client(_TIMEOUT, mc)
nodes = _get_nodes_from_client(mc)
self.assertIsNotNone(nodes)
self.assertEqual(
1,
Expand All @@ -167,29 +147,20 @@ def test_zeroconf_discovery(self):
)

def _test_zeroconf_dim_mm(self, disable_zeroconf=False):
# Start daemon with no master and no NM
# Start an empty daemon, then a DIM and a Master on their own
self.create_daemon(master=False, noNM=True, disable_zeroconf=disable_zeroconf)
# Start DIM - now, on it's own
self._start("island", http.HTTPStatus.OK, {"nodes": []})
# Start daemon with master but no NM
self._start("master", http.HTTPStatus.OK)
# Check that dim registers to MM
dims = None
mc = MasterManagerClient()

def _update_dims(*args):
_dims = _get_dims_from_client(_TIMEOUT, args[0])
return _dims

def _test_dims(_dims):
if dims is not None and len(dims["islands"]) > 0:
return dims
else:
return False

dims = _wait_until(_update_dims, _test_dims, _TIMEOUT, 0.1, mc)
self.assertIsNotNone(dims)
return dims
# Check that dim registers to MM when using zeroconf
mc = MasterManagerClient()
if not disable_zeroconf:
def _test_dims(dims):
return dims and dims["islands"]
dims = _get_dims_from_client(mc, test_condition=_test_dims)
self.assertIsNotNone(dims)
return dims
return mc.dims()

def test_zeroconf_dim_mm(self):
dims = self._test_zeroconf_dim_mm(disable_zeroconf=False)
Expand All @@ -210,15 +181,7 @@ def test_without_zeroconf_dim_mm(self):
def _add_zeroconf_nm(self):
self._start("node", http.HTTPStatus.OK)
mc = MasterManagerClient()

def _test_nodes(_nodes):
if _nodes is not None and len(_nodes) > 0:
return _nodes
else:
return False

nodes = _wait_until(_update_nodes_with_timeout, _test_nodes, _TIMEOUT, 0.1, mc)
return nodes
return _get_nodes_from_client(mc)

def test_zeroconf_dim_nm_setup(self):
"""
Expand All @@ -242,14 +205,10 @@ def test_zeroconf_nm_down(self):
self._stop("node", http.HTTPStatus.OK)
mc = MasterManagerClient()

def _test_nodes(_nodes):
if not _nodes:
return False
if not _nodes['nodes']:
return _nodes['nodes']
return False
def _test_nodes(nodes):
return not nodes['nodes']

new_nodes = _wait_until(_update_nodes_with_timeout, _test_nodes, _TIMEOUT, 0.1, mc)['nodes']
new_nodes = _get_nodes_from_client(mc, test_condition=_test_nodes)['nodes']
self.assertEqual(0, len(new_nodes))

def test_start_dataisland_via_rest(self):
Expand All @@ -260,7 +219,7 @@ def test_start_dataisland_via_rest(self):
# if we query the MM it should know about its nodes, which should have
# one element
mc = MasterManagerClient()
nodes = _get_nodes_from_client(_TIMEOUT, mc)
nodes = _get_nodes_from_client(mc)
self.assertIsNotNone(nodes)
self.assertEqual(
1,
Expand All @@ -280,14 +239,14 @@ def test_stop_dataisland_via_rest(self):
# start master and island manager
self.create_daemon(master=True, noNM=False, disable_zeroconf=False)
mc = MasterManagerClient()
nodes = _get_nodes_from_client(_TIMEOUT, mc)
nodes = _get_nodes_from_client(mc)
self._start("island", http.HTTPStatus.OK, {"nodes": nodes})

# Both managers started fine. If they zeroconf themselves correctly then
# if we query the MM it should know about its nodes, which should have
# one element
mc = MasterManagerClient()
nodes = _get_nodes_from_client(_TIMEOUT, mc)
nodes = _get_nodes_from_client(mc)
self.assertIsNotNone(nodes)
self.assertEqual(
1,
Expand All @@ -297,8 +256,8 @@ def test_stop_dataisland_via_rest(self):

# Check that the DataIsland stopped
self._stop("island", http.HTTPStatus.OK, "")
self.assertFalse(
utils.portIsOpen("localhost", constants.ISLAND_DEFAULT_REST_PORT, _TIMEOUT),
self.assertTrue(
utils.portIsClosed("localhost", constants.ISLAND_DEFAULT_REST_PORT, _TIMEOUT),
"The DIM did not stop successfully",
)

Expand All @@ -311,7 +270,7 @@ def test_stop_start_node_via_rest(self):
# if we query the MM it should know about its nodes, which should have
# one element
mc = MasterManagerClient()
nodes = _get_nodes_from_client(_TIMEOUT, mc)
nodes = _get_nodes_from_client(mc)
self.assertIsNotNone(nodes)
self.assertEqual(
1,
Expand All @@ -321,14 +280,15 @@ def test_stop_start_node_via_rest(self):

# Check that the NM stops
self._stop("node", http.HTTPStatus.OK, "")
self.assertFalse(
utils.portIsOpen("localhost", constants.NODE_DEFAULT_REST_PORT, _TIMEOUT),
self.assertTrue(
utils.portIsClosed('localhost', constants.NODE_DEFAULT_REST_PORT, _TIMEOUT),
"The node did not stop successfully",
)

# Check that the NM starts
self._start("node", http.HTTPStatus.OK, {"pid": nodes})
self.assertTrue(
utils.portIsOpen("localhost", constants.NODE_DEFAULT_REST_PORT, _TIMEOUT),
utils.portIsOpen('localhost', constants.NODE_DEFAULT_REST_PORT, _TIMEOUT),
"The node did not start successfully",
)

Expand All @@ -345,16 +305,16 @@ def test_start_stop_master_via_rest(self):

# Check that the MM stops
self._stop("master", http.HTTPStatus.OK, "")
self.assertFalse(
utils.portIsOpen("localhost", constants.MASTER_DEFAULT_REST_PORT, _TIMEOUT),
self.assertTrue(
utils.portIsClosed("localhost", constants.MASTER_DEFAULT_REST_PORT, _TIMEOUT),
"The MM did not stop successfully",
)

def test_get_dims(self):
self.create_daemon(master=True, noNM=True, disable_zeroconf=False)
# Check that the DataIsland starts with the given nodes
mc = MasterManagerClient()
dims = _get_dims_from_client(_TIMEOUT, mc)
dims = _get_dims_from_client(mc)
self.assertIsNotNone(dims)
self.assertEqual(
0,
Expand Down