Skip to content

Commit

Permalink
Tests serial dm first, then parallel.
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed Nov 3, 2021
1 parent 1a2a260 commit cfae921
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions daliuge-engine/test/manager/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def _test_runGraphInTwoNMs(
return leaf_drop_data


class TestDMParallel(NMTestsMixIn, unittest.TestCase):
class TestDM(NMTestsMixIn, unittest.TestCase):
def _deploy_error_graph(self, **kwargs):
sessionId = f"s{random.randint(0, 1000)}"
g = [
Expand All @@ -164,7 +164,7 @@ def _deploy_error_graph(self, **kwargs):
},
{"oid": "C", "type": "plain", "storage": Categories.MEMORY, "producers": ["B"]},
]
dm = self._start_dm(threads=multiprocessing.cpu_count(), **kwargs)
dm = self._start_dm(**kwargs)
dm.createSession(sessionId)
dm.addGraphSpec(sessionId, g)
dm.deploySession(sessionId, ["A"])
Expand Down Expand Up @@ -209,8 +209,7 @@ def _test_runGraphOneDOPerDOM(self, repeats=1):
rels = [DROPRel("B", DROPLinkType.CONSUMER, "A")]
a_data = os.urandom(32)
c_data = str(crc32(a_data, 0)).encode('utf8')
node_managers = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)]

node_managers = [self._start_dm() for _ in range(2)]
ids = [0] * repeats
for n in range(repeats):
choice = 0
Expand Down Expand Up @@ -254,9 +253,9 @@ def test_runGraphSeveralDropsPerDM(self):
:see: `self.test_runGraphSingleDOPerDOM`
"""
dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)]
dm1, dm2 = [self._start_dm() for _ in range(2)]

sessionId = f"s{random.randint(0, 1000)}"
sessionId = "s1"
g1 = [
{"oid": "A", "type": "plain", "storage": Categories.MEMORY, "consumers": ["C"]},
{"oid": "B", "type": "plain", "storage": Categories.MEMORY},
Expand Down Expand Up @@ -325,7 +324,7 @@ def test_runWithFourDMs(self):
B, F, G, K and N are AppDOs; the rest are plain in-memory DROPs
"""

dm1, dm2, dm3, dm4 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(4)]
dm1, dm2, dm3, dm4 = [self._start_dm() for _ in range(4)]

sessionId = f"s{random.randint(0, 1000)}"
g1 = [memory("A", expectedSize=1)]
Expand Down Expand Up @@ -411,7 +410,7 @@ def test_many_relationships(self):
======= ====================
"""

dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)]
dm1, dm2 = [self._start_dm() for _ in range(2)]

sessionId = f"s{random.randint(0, 1000)}"
N = 100
Expand Down Expand Up @@ -469,7 +468,7 @@ def test_runGraphSeveralDropsPerDM_with_get_consumer_nodes(self):
ip_addr_1 = "8.8.8.8"
ip_addr_2 = "8.8.8.9"

dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()//2) for _ in range(2)]
dm1, dm2 = [self._start_dm() for _ in range(2)]

sessionId = f"s{random.randint(0, 1000)}"
g1 = [
Expand Down Expand Up @@ -585,7 +584,7 @@ def test_run_streaming_consumer_remotely2(self):
self._test_runGraphInTwoNMs(g1, g2, rels, a_data, e_data, leaf_oid="E")


class TestDM(NMTestsMixIn, unittest.TestCase):
class TestDMParallel(NMTestsMixIn, unittest.TestCase):
def _deploy_error_graph(self, **kwargs):
sessionId = f"s{random.randint(0, 1000)}"
g = [
Expand All @@ -598,7 +597,7 @@ def _deploy_error_graph(self, **kwargs):
},
{"oid": "C", "type": "plain", "storage": Categories.MEMORY, "producers": ["B"]},
]
dm = self._start_dm(**kwargs)
dm = self._start_dm(threads=multiprocessing.cpu_count(), **kwargs)
dm.createSession(sessionId)
dm.addGraphSpec(sessionId, g)
dm.deploySession(sessionId, ["A"])
Expand Down Expand Up @@ -643,7 +642,8 @@ def _test_runGraphOneDOPerDOM(self, repeats=1):
rels = [DROPRel("B", DROPLinkType.CONSUMER, "A")]
a_data = os.urandom(32)
c_data = str(crc32(a_data, 0)).encode('utf8')
node_managers = [self._start_dm() for _ in range(2)]
node_managers = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)]

ids = [0] * repeats
for n in range(repeats):
choice = 0
Expand Down Expand Up @@ -687,9 +687,9 @@ def test_runGraphSeveralDropsPerDM(self):
:see: `self.test_runGraphSingleDOPerDOM`
"""
dm1, dm2 = [self._start_dm() for _ in range(2)]
dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)]

sessionId = "s1"
sessionId = f"s{random.randint(0, 1000)}"
g1 = [
{"oid": "A", "type": "plain", "storage": Categories.MEMORY, "consumers": ["C"]},
{"oid": "B", "type": "plain", "storage": Categories.MEMORY},
Expand Down Expand Up @@ -758,7 +758,7 @@ def test_runWithFourDMs(self):
B, F, G, K and N are AppDOs; the rest are plain in-memory DROPs
"""

dm1, dm2, dm3, dm4 = [self._start_dm() for _ in range(4)]
dm1, dm2, dm3, dm4 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(4)]

sessionId = f"s{random.randint(0, 1000)}"
g1 = [memory("A", expectedSize=1)]
Expand Down Expand Up @@ -844,7 +844,7 @@ def test_many_relationships(self):
======= ====================
"""

dm1, dm2 = [self._start_dm() for _ in range(2)]
dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)]

sessionId = f"s{random.randint(0, 1000)}"
N = 100
Expand Down Expand Up @@ -902,7 +902,7 @@ def test_runGraphSeveralDropsPerDM_with_get_consumer_nodes(self):
ip_addr_1 = "8.8.8.8"
ip_addr_2 = "8.8.8.9"

dm1, dm2 = [self._start_dm() for _ in range(2)]
dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()//2) for _ in range(2)]

sessionId = f"s{random.randint(0, 1000)}"
g1 = [
Expand Down

0 comments on commit cfae921

Please sign in to comment.