Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions test/dag_connection_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3

import os
import time
from pathlib import Path

from test_base import TestBase
Expand All @@ -13,13 +12,13 @@ def __init__(self):
self.graph_file_path = (
Path(os.path.dirname(__file__)) / "data" / "ten_semi_unconnected.graphml"
)
self.scenario_timeout = 180 # seconds

def run_test(self):
self.start_server()
try:
self.setup_network()
self.run_connect_dag_scenario()
self.run_connect_dag_scenario_post_connection()
finally:
self.stop_server()

Expand All @@ -31,31 +30,21 @@ def setup_network(self):

def run_connect_dag_scenario(self):
self.log.info("Running connect_dag scenario")
self.log_expected_msgs = [
"Successfully ran the connect_dag.py scenario using a temporary file"
]
self.log_unexpected_msgs = ["Test failed."]
self.warcli("scenarios run-file test/framework_tests/connect_dag.py")
self.wait_for_all_scenarios()
self.assert_log_msgs()

start_time = time.time()
while time.time() - start_time < self.scenario_timeout:
running_scenarios = self.rpc("scenarios_list_running")
if not running_scenarios:
self.log.info("Scenario completed successfully")
return

if len(running_scenarios) == 1 and not running_scenarios[0]["active"]:
self.log.info("Scenario completed successfully")
return

time.sleep(1)

self.log.error(f"Scenario did not complete within {self.scenario_timeout} seconds")
self.stop_running_scenario()
raise AssertionError(f"Scenario timed out after {self.scenario_timeout} seconds")

def stop_running_scenario(self):
running_scenarios = self.rpc("scenarios_list_running")
if running_scenarios:
pid = running_scenarios[0]["pid"]
self.log.warning(f"Stopping scenario with PID {pid}")
self.warcli(f"scenarios stop {pid}", False)
def run_connect_dag_scenario_post_connection(self):
self.log.info("Running connect_dag scenario")
self.log_expected_msgs = ["Successfully ran the connect_dag.py scenario"]
self.log_unexpected_msgs = ["Test failed"]
self.warcli("scenarios run-file test/framework_tests/connect_dag.py")
self.wait_for_all_scenarios()
self.assert_log_msgs()


if __name__ == "__main__":
Expand Down
50 changes: 45 additions & 5 deletions test/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import logging
import logging.config
import os
import re
import threading
from pathlib import Path
from subprocess import PIPE, STDOUT, Popen, run
from subprocess import PIPE, Popen, run
from tempfile import mkdtemp
from time import sleep

Expand All @@ -20,6 +21,9 @@ def __init__(self):
self.setup_environment()
self.setup_logging()
atexit.register(self.cleanup)
self.log_expected_msgs: None | [str] = None
self.log_unexpected_msgs: None | [str] = None
self.log_msg_assertions_passed = False
self.log.info("Warnet test base initialized")

def setup_environment(self):
Expand Down Expand Up @@ -59,6 +63,19 @@ def cleanup(self, signum=None, frame=None):
self.server_thread.join()
self.server = None

def _print_and_assert_msgs(self, message):
print(message)
if (self.log_expected_msgs or self.log_unexpected_msgs) and assert_log(
message, self.log_expected_msgs, self.log_unexpected_msgs
):
self.log_msg_assertions_passed = True

def assert_log_msgs(self):
assert (
self.log_msg_assertions_passed
), f"Log assertion failed. Expected message not found: {self.log_expected_msgs}"
self.log_msg_assertions_passed = False
Comment on lines +73 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran a test with an unexpected message which aborted early, as expected. Funny thing though, because it exited early the test never got to the part where it prints the expected message. So even though its true the expected message was not found in the log, the more relevant error is the unexpected message which occurred first. I'm not sure if there's anything we really need to change about this, unless you have an idea:

2024-07-15 13:55:25 | INFO    | scenario | ConnectDag PRNG seed is: 131260415370612
Exception in thread Thread-1 (output_reader):
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.12/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/opt/homebrew/Cellar/python@3.12/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/matthewzipkin/Desktop/work/warnet/test/test_base.py", line 103, in output_reader
    func(line)
  File "/Users/matthewzipkin/Desktop/work/warnet/test/test_base.py", line 67, in _print_and_assert_msgs
    if (self.log_expected_msgs or self.log_unexpected_msgs) and assert_log(
                                                                ^^^^^^^^^^^
  File "/Users/matthewzipkin/Desktop/work/warnet/test/test_base.py", line 214, in assert_log
    raise AssertionError(f"Unexpected message found in log: {unexpected_msg}")
AssertionError: Unexpected message found in log: Tank
2024-07-15 09:55:29 | DEBUG   | test     | Executing RPC method: scenarios_list_running
2024-07-15 09:55:29 | INFO    | test     | Stopping network
2024-07-15 09:55:29 | DEBUG   | test     | Executing warcli command: network down
2024-07-15 09:55:30 | DEBUG   | test     | Waiting for predicate with timeout 60s and interval 1s
2024-07-15 09:55:30 | DEBUG   | test     | Executing RPC method: network_status
2024-07-15 09:55:30 | INFO    | test     | Waiting for all tanks to reach 'stopped': {'total': 10, 'stopped': 9, 'running': 1}
2024-07-15 09:55:31 | DEBUG   | test     | Executing RPC method: network_status
2024-07-15 09:55:32 | INFO    | test     | Waiting for all tanks to reach 'stopped': {'total': 10, 'stopped': 10}
Traceback (most recent call last):
  File "/Users/matthewzipkin/Desktop/work/warnet/test/dag_connection_test.py", line 52, in <module>
    test.run_test()
  File "/Users/matthewzipkin/Desktop/work/warnet/test/dag_connection_test.py", line 20, in run_test
    self.run_connect_dag_scenario()
  File "/Users/matthewzipkin/Desktop/work/warnet/test/dag_connection_test.py", line 39, in run_connect_dag_scenario
    self.assert_log_msgs()
  File "/Users/matthewzipkin/Desktop/work/warnet/test/test_base.py", line 75, in assert_log_msgs
    self.log_msg_assertions_passed
AssertionError: Log assertion failed. Expected message not found: ['Successfully ran the connect_dag.py scenario using a temporary file']

Copy link
Collaborator Author

@mplsgrant mplsgrant Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the unexpected message AssertionError gets buried in the log, and the less meaningful message not found AssertionError gets highlighted by virtue of being the last message.

To fix this, I would probably use an enum named "Found": Nothing, Expected, Unexpected, and UnexpectedExpected. Then lift the assertion call out of the assert_log helper function and plug it into the TestBase's asset_log_msgs function. Then I would have the helper assert_log functions return the enum.


def warcli(self, cmd, network=True):
self.log.debug(f"Executing warcli command: {cmd}")
command = ["warcli"] + cmd.split()
Expand Down Expand Up @@ -94,19 +111,19 @@ def start_server(self):
# TODO: check for conflicting warnet process
# maybe also ensure that no conflicting docker networks exist

# For kubernetes we assume the server is started outside test base
# For kubernetes we assume the server is started outside test base,
# but we can still read its log output
self.log.info("Starting Warnet server")
self.server = Popen(
["kubectl", "logs", "-f", "rpc-0"],
["kubectl", "logs", "-f", "rpc-0", "--since=1s"],
stdout=PIPE,
stderr=STDOUT,
stderr=PIPE,
bufsize=1,
universal_newlines=True,
)

self.server_thread = threading.Thread(
target=self.output_reader, args=(self.server.stdout, print)
target=self.output_reader, args=(self.server.stdout, self._print_and_assert_msgs)
)
self.server_thread.daemon = True
self.server_thread.start()
Expand Down Expand Up @@ -176,3 +193,26 @@ def get_scenario_return_code(self, scenario_name):
if len(scns) == 0:
raise Exception(f"Scenario {scenario_name} not found in running scenarios")
return scns[0]["return_code"]


def assert_equal(thing1, thing2, *args):
if thing1 != thing2 or any(thing1 != arg for arg in args):
raise AssertionError(
"not({})".format(" == ".join(str(arg) for arg in (thing1, thing2) + args))
)


def assert_log(log_message, expected_msgs, unexpected_msgs=None) -> bool:
if unexpected_msgs is None:
unexpected_msgs = []
assert_equal(type(expected_msgs), list)
assert_equal(type(unexpected_msgs), list)

found = True
for unexpected_msg in unexpected_msgs:
if re.search(re.escape(unexpected_msg), log_message, flags=re.MULTILINE):
raise AssertionError(f"Unexpected message found in log: {unexpected_msg}")
for expected_msg in expected_msgs:
if re.search(re.escape(expected_msg), log_message, flags=re.MULTILINE) is None:
found = False
return found