diff --git a/daliuge-engine/test/manager/test_dm.py b/daliuge-engine/test/manager/test_dm.py index 835ff8d6a..ed5e76c8d 100644 --- a/daliuge-engine/test/manager/test_dm.py +++ b/daliuge-engine/test/manager/test_dm.py @@ -151,7 +151,7 @@ def _test_runGraphInTwoNMs( return leaf_drop_data -class TestDMParallel(NMTestsMixIn, unittest.TestCase): +class TestDM(NMTestsMixIn, unittest.TestCase): def _deploy_error_graph(self, **kwargs): sessionId = f"s{random.randint(0, 1000)}" g = [ @@ -164,7 +164,7 @@ def _deploy_error_graph(self, **kwargs): }, {"oid": "C", "type": "plain", "storage": Categories.MEMORY, "producers": ["B"]}, ] - dm = self._start_dm(threads=multiprocessing.cpu_count(), **kwargs) + dm = self._start_dm(**kwargs) dm.createSession(sessionId) dm.addGraphSpec(sessionId, g) dm.deploySession(sessionId, ["A"]) @@ -209,8 +209,7 @@ def _test_runGraphOneDOPerDOM(self, repeats=1): rels = [DROPRel("B", DROPLinkType.CONSUMER, "A")] a_data = os.urandom(32) c_data = str(crc32(a_data, 0)).encode('utf8') - node_managers = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)] - + node_managers = [self._start_dm() for _ in range(2)] ids = [0] * repeats for n in range(repeats): choice = 0 @@ -254,9 +253,9 @@ def test_runGraphSeveralDropsPerDM(self): :see: `self.test_runGraphSingleDOPerDOM` """ - dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)] + dm1, dm2 = [self._start_dm() for _ in range(2)] - sessionId = f"s{random.randint(0, 1000)}" + sessionId = "s1" g1 = [ {"oid": "A", "type": "plain", "storage": Categories.MEMORY, "consumers": ["C"]}, {"oid": "B", "type": "plain", "storage": Categories.MEMORY}, @@ -325,7 +324,7 @@ def test_runWithFourDMs(self): B, F, G, K and N are AppDOs; the rest are plain in-memory DROPs """ - dm1, dm2, dm3, dm4 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(4)] + dm1, dm2, dm3, dm4 = [self._start_dm() for _ in range(4)] sessionId = f"s{random.randint(0, 1000)}" g1 = [memory("A", expectedSize=1)] @@ -411,7 +410,7 @@ def test_many_relationships(self): ======= ==================== """ - dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)] + dm1, dm2 = [self._start_dm() for _ in range(2)] sessionId = f"s{random.randint(0, 1000)}" N = 100 @@ -469,7 +468,7 @@ def test_runGraphSeveralDropsPerDM_with_get_consumer_nodes(self): ip_addr_1 = "8.8.8.8" ip_addr_2 = "8.8.8.9" - dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()//2) for _ in range(2)] + dm1, dm2 = [self._start_dm() for _ in range(2)] sessionId = f"s{random.randint(0, 1000)}" g1 = [ @@ -585,7 +584,7 @@ def test_run_streaming_consumer_remotely2(self): self._test_runGraphInTwoNMs(g1, g2, rels, a_data, e_data, leaf_oid="E") -class TestDM(NMTestsMixIn, unittest.TestCase): +class TestDMParallel(NMTestsMixIn, unittest.TestCase): def _deploy_error_graph(self, **kwargs): sessionId = f"s{random.randint(0, 1000)}" g = [ @@ -598,7 +597,7 @@ def _deploy_error_graph(self, **kwargs): }, {"oid": "C", "type": "plain", "storage": Categories.MEMORY, "producers": ["B"]}, ] - dm = self._start_dm(**kwargs) + dm = self._start_dm(threads=multiprocessing.cpu_count(), **kwargs) dm.createSession(sessionId) dm.addGraphSpec(sessionId, g) dm.deploySession(sessionId, ["A"]) @@ -643,7 +642,8 @@ def _test_runGraphOneDOPerDOM(self, repeats=1): rels = [DROPRel("B", DROPLinkType.CONSUMER, "A")] a_data = os.urandom(32) c_data = str(crc32(a_data, 0)).encode('utf8') - node_managers = [self._start_dm() for _ in range(2)] + node_managers = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)] + ids = [0] * repeats for n in range(repeats): choice = 0 @@ -687,9 +687,9 @@ def test_runGraphSeveralDropsPerDM(self): :see: `self.test_runGraphSingleDOPerDOM` """ - dm1, dm2 = [self._start_dm() for _ in range(2)] + dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)] - sessionId = "s1" + sessionId = f"s{random.randint(0, 1000)}" g1 = [ {"oid": "A", "type": "plain", "storage": Categories.MEMORY, "consumers": ["C"]}, {"oid": "B", "type": "plain", "storage": Categories.MEMORY}, @@ -758,7 +758,7 @@ def test_runWithFourDMs(self): B, F, G, K and N are AppDOs; the rest are plain in-memory DROPs """ - dm1, dm2, dm3, dm4 = [self._start_dm() for _ in range(4)] + dm1, dm2, dm3, dm4 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(4)] sessionId = f"s{random.randint(0, 1000)}" g1 = [memory("A", expectedSize=1)] @@ -844,7 +844,7 @@ def test_many_relationships(self): ======= ==================== """ - dm1, dm2 = [self._start_dm() for _ in range(2)] + dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()) for _ in range(2)] sessionId = f"s{random.randint(0, 1000)}" N = 100 @@ -902,7 +902,7 @@ def test_runGraphSeveralDropsPerDM_with_get_consumer_nodes(self): ip_addr_1 = "8.8.8.8" ip_addr_2 = "8.8.8.9" - dm1, dm2 = [self._start_dm() for _ in range(2)] + dm1, dm2 = [self._start_dm(threads=multiprocessing.cpu_count()//2) for _ in range(2)] sessionId = f"s{random.randint(0, 1000)}" g1 = [