diff --git a/cardano_node_tests/utils/parallel_run.py b/cardano_node_tests/utils/parallel_run.py index 22a5f155c..efe484ae6 100644 --- a/cardano_node_tests/utils/parallel_run.py +++ b/cardano_node_tests/utils/parallel_run.py @@ -21,8 +21,8 @@ LOCK_LOG_FILE = ".lock.log" SESSION_RUNNING_FILE = ".session_running" TEST_SINGLETON_FILE = ".test_singleton" -RESOURCE_LOCKED = ".resource_locked" -RESOURCE_IN_USE = ".resource_in_use" +RESOURCE_LOCKED_GLOB = ".resource_locked" +RESOURCE_IN_USE_GLOB = ".resource_in_use" RESTART_NEEDED_GLOB = ".needs_restart" RESTART_IN_PROGRESS_GLOB = ".restart_in_progress" RESTART_AFTER_MARK_GLOB = ".restart_after_mark" @@ -74,12 +74,14 @@ def cache(self) -> ClusterManagerCache: return self.get_cache() def _log(self, msg: str) -> None: + """Log message - needs to be called while having lock.""" if not self.lock_log: return with open(self.lock_log, "a") as logfile: logfile.write(f"{datetime.datetime.now()} on {self.worker_id}: {msg}\n") def _locked_log(self, msg: str) -> None: + """Log message - will obtain lock first.""" if not self.lock_log: return with helpers.FileLockIfXdist(self.cluster_lock): @@ -187,14 +189,14 @@ def on_test_stop(self) -> None: # remove resource locking files created by the worker resource_locking_files = list( - self.lock_dir.glob(f"{RESOURCE_LOCKED}_*_{self.worker_id}") + self.lock_dir.glob(f"{RESOURCE_LOCKED_GLOB}_*_{self.worker_id}") ) for f in resource_locking_files: os.remove(f) # remove "resource in use" files created by the worker resource_in_use_files = list( - self.lock_dir.glob(f"{RESOURCE_IN_USE}_*_{self.worker_id}") + self.lock_dir.glob(f"{RESOURCE_IN_USE_GLOB}_*_{self.worker_id}") ) for f in resource_in_use_files: os.remove(f) @@ -216,6 +218,11 @@ def get( # noqa: C901 use_resources: UnpackableSequence = (), cleanup: bool = False, ) -> clusterlib.ClusterLib: + """Return the `clusterlib.ClusterLib` instance once we can start the test. + + It checks current conditions and waits if the conditions don't allow to start the test + right away. + """ # pylint: disable=too-many-statements,too-many-branches restart_here = False mark_start_here = False @@ -225,6 +232,7 @@ def get( # noqa: C901 test_running_file = self.lock_dir / f"{TEST_RUNNING_GLOB}_{self.worker_id}" cluster_obj = self.cache.cluster_obj + # iterate until it is possible to start the test while True: if not first_iteration: helpers.xdist_sleep(random.random() * sleep_delay) @@ -352,11 +360,11 @@ def get( # noqa: C901 if lock_resources: res_usable = False for res in lock_resources: - res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED}_{res}_*")) + res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED_GLOB}_{res}_*")) if res_locked: self._log(f"resource '{res}' locked, cannot start") break - res_used = list(self.lock_dir.glob(f"{RESOURCE_IN_USE}_{res}_*")) + res_used = list(self.lock_dir.glob(f"{RESOURCE_IN_USE_GLOB}_{res}_*")) if res_used: self._log(f"resource '{res}' in use, cannot lock and start") break @@ -372,19 +380,23 @@ def get( # noqa: C901 "starting and locking" ) + # create status file for each locked resource _ = [ - open(self.lock_dir / f"{RESOURCE_LOCKED}_{r}_{self.worker_id}", "a").close() + open( + self.lock_dir / f"{RESOURCE_LOCKED_GLOB}_{r}_{self.worker_id}", "a" + ).close() for r in lock_resources ] # filter out `lock_resources` from the list of `use_resources` - use_resources = list(set(use_resources) - set(lock_resources)) + if use_resources and lock_resources: + use_resources = list(set(use_resources) - set(lock_resources)) # this test wants to use some resources, check if these are not locked if use_resources: res_locked = [] for res in use_resources: - res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED}_{res}_*")) + res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED_GLOB}_{res}_*")) if res_locked: self._log(f"resource '{res}' locked, cannot start") break @@ -393,8 +405,12 @@ def get( # noqa: C901 no_tests_iteration = 0 continue self._log("none of the resources in '{use_resources}' locked, starting") + + # create status file for each in-use resource _ = [ - open(self.lock_dir / f"{RESOURCE_IN_USE}_{r}_{self.worker_id}", "a").close() + open( + self.lock_dir / f"{RESOURCE_IN_USE_GLOB}_{r}_{self.worker_id}", "a" + ).close() for r in use_resources ] @@ -430,6 +446,7 @@ def get( # noqa: C901 if not cluster_obj: cluster_obj = clusterlib.ClusterLib(state_dir) + # `cluster_obj` is ready, we can start the test break return cluster_obj