diff --git a/test/dag_connection_test.py b/test/dag_connection_test.py index feecbf290..dba9b8bfb 100755 --- a/test/dag_connection_test.py +++ b/test/dag_connection_test.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import os -import time from pathlib import Path from test_base import TestBase @@ -13,13 +12,13 @@ def __init__(self): self.graph_file_path = ( Path(os.path.dirname(__file__)) / "data" / "ten_semi_unconnected.graphml" ) - self.scenario_timeout = 180 # seconds def run_test(self): self.start_server() try: self.setup_network() self.run_connect_dag_scenario() + self.run_connect_dag_scenario_post_connection() finally: self.stop_server() @@ -31,31 +30,21 @@ def setup_network(self): def run_connect_dag_scenario(self): self.log.info("Running connect_dag scenario") + self.log_expected_msgs = [ + "Successfully ran the connect_dag.py scenario using a temporary file" + ] + self.log_unexpected_msgs = ["Test failed."] self.warcli("scenarios run-file test/framework_tests/connect_dag.py") + self.wait_for_all_scenarios() + self.assert_log_msgs() - start_time = time.time() - while time.time() - start_time < self.scenario_timeout: - running_scenarios = self.rpc("scenarios_list_running") - if not running_scenarios: - self.log.info("Scenario completed successfully") - return - - if len(running_scenarios) == 1 and not running_scenarios[0]["active"]: - self.log.info("Scenario completed successfully") - return - - time.sleep(1) - - self.log.error(f"Scenario did not complete within {self.scenario_timeout} seconds") - self.stop_running_scenario() - raise AssertionError(f"Scenario timed out after {self.scenario_timeout} seconds") - - def stop_running_scenario(self): - running_scenarios = self.rpc("scenarios_list_running") - if running_scenarios: - pid = running_scenarios[0]["pid"] - self.log.warning(f"Stopping scenario with PID {pid}") - self.warcli(f"scenarios stop {pid}", False) + def run_connect_dag_scenario_post_connection(self): + self.log.info("Running connect_dag scenario") + self.log_expected_msgs = ["Successfully ran the connect_dag.py scenario"] + self.log_unexpected_msgs = ["Test failed"] + self.warcli("scenarios run-file test/framework_tests/connect_dag.py") + self.wait_for_all_scenarios() + self.assert_log_msgs() if __name__ == "__main__": diff --git a/test/test_base.py b/test/test_base.py index 9ba017fb9..2591cb69f 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -3,9 +3,10 @@ import logging import logging.config import os +import re import threading from pathlib import Path -from subprocess import PIPE, STDOUT, Popen, run +from subprocess import PIPE, Popen, run from tempfile import mkdtemp from time import sleep @@ -20,6 +21,9 @@ def __init__(self): self.setup_environment() self.setup_logging() atexit.register(self.cleanup) + self.log_expected_msgs: None | [str] = None + self.log_unexpected_msgs: None | [str] = None + self.log_msg_assertions_passed = False self.log.info("Warnet test base initialized") def setup_environment(self): @@ -59,6 +63,19 @@ def cleanup(self, signum=None, frame=None): self.server_thread.join() self.server = None + def _print_and_assert_msgs(self, message): + print(message) + if (self.log_expected_msgs or self.log_unexpected_msgs) and assert_log( + message, self.log_expected_msgs, self.log_unexpected_msgs + ): + self.log_msg_assertions_passed = True + + def assert_log_msgs(self): + assert ( + self.log_msg_assertions_passed + ), f"Log assertion failed. Expected message not found: {self.log_expected_msgs}" + self.log_msg_assertions_passed = False + def warcli(self, cmd, network=True): self.log.debug(f"Executing warcli command: {cmd}") command = ["warcli"] + cmd.split() @@ -94,19 +111,19 @@ def start_server(self): # TODO: check for conflicting warnet process # maybe also ensure that no conflicting docker networks exist - # For kubernetes we assume the server is started outside test base + # For kubernetes we assume the server is started outside test base, # but we can still read its log output self.log.info("Starting Warnet server") self.server = Popen( - ["kubectl", "logs", "-f", "rpc-0"], + ["kubectl", "logs", "-f", "rpc-0", "--since=1s"], stdout=PIPE, - stderr=STDOUT, + stderr=PIPE, bufsize=1, universal_newlines=True, ) self.server_thread = threading.Thread( - target=self.output_reader, args=(self.server.stdout, print) + target=self.output_reader, args=(self.server.stdout, self._print_and_assert_msgs) ) self.server_thread.daemon = True self.server_thread.start() @@ -176,3 +193,26 @@ def get_scenario_return_code(self, scenario_name): if len(scns) == 0: raise Exception(f"Scenario {scenario_name} not found in running scenarios") return scns[0]["return_code"] + + +def assert_equal(thing1, thing2, *args): + if thing1 != thing2 or any(thing1 != arg for arg in args): + raise AssertionError( + "not({})".format(" == ".join(str(arg) for arg in (thing1, thing2) + args)) + ) + + +def assert_log(log_message, expected_msgs, unexpected_msgs=None) -> bool: + if unexpected_msgs is None: + unexpected_msgs = [] + assert_equal(type(expected_msgs), list) + assert_equal(type(unexpected_msgs), list) + + found = True + for unexpected_msg in unexpected_msgs: + if re.search(re.escape(unexpected_msg), log_message, flags=re.MULTILINE): + raise AssertionError(f"Unexpected message found in log: {unexpected_msg}") + for expected_msg in expected_msgs: + if re.search(re.escape(expected_msg), log_message, flags=re.MULTILINE) is None: + found = False + return found