diff --git a/src/backend/kubernetes_backend.py b/src/backend/kubernetes_backend.py index 539e28a52..277a72fca 100644 --- a/src/backend/kubernetes_backend.py +++ b/src/backend/kubernetes_backend.py @@ -16,7 +16,7 @@ from kubernetes.dynamic import DynamicClient from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError from kubernetes.stream import stream -from warnet.services import ServiceType, services +from warnet.services import SERVICES, ServiceType from warnet.status import RunningStatus from warnet.tank import Tank from warnet.utils import parse_raw_messages @@ -81,14 +81,17 @@ def down(self, warnet) -> bool: self.remove_prometheus_service_monitors(warnet.tanks) for service_name in warnet.services: - self.client.delete_namespaced_pod( - self.get_service_pod_name(services[service_name]["container_name_suffix"]), - self.namespace, - ) - self.client.delete_namespaced_service( - self.get_service_service_name(services[service_name]["container_name_suffix"]), - self.namespace, - ) + try: + self.client.delete_namespaced_pod( + self.get_service_pod_name(SERVICES[service_name]["container_name_suffix"]), + self.namespace, + ) + self.client.delete_namespaced_service( + self.get_service_service_name(SERVICES[service_name]["container_name_suffix"]), + self.namespace, + ) + except Exception as e: + self.log.error(f"Could not delete service: {service_name}:\n{e}") return True @@ -724,7 +727,10 @@ def deploy_pods(self, warnet): self.apply_prometheus_service_monitors(warnet.tanks) for service_name in warnet.services: - self.service_from_json(services[service_name]) + try: + self.service_from_json(SERVICES[service_name]) + except Exception as e: + self.log.error(f"Error starting service: {service_name}\n{e}") self.log.debug("Containers and services created. Configuring IP addresses") # now that the pods have had a second to create, @@ -738,7 +744,7 @@ def deploy_pods(self, warnet): pod_name = self.get_pod_name(tank.index, ServiceType.BITCOIN) pod = self.get_pod(pod_name) if pod is None or pod.status is None or getattr(pod.status, "pod_ip", None) is None: - print("Waiting for pod response or pod IP...") + self.log.info("Waiting for pod response or pod IP...") time.sleep(3) continue pod_ip = pod.status.pod_ip @@ -831,7 +837,7 @@ def service_from_json(self, obj): self.client.create_namespaced_service(namespace=self.namespace, body=service_service) def write_service_config(self, source_path: str, service_name: str, destination_path: str): - obj = services[service_name] + obj = SERVICES[service_name] container_name = "sidecar" # Copy the archive from our local drive (Warnet RPC container/pod) # to the destination service's sidecar container via ssh diff --git a/src/logging_config/config.json b/src/logging_config/config.json index bdbfc8cb7..2892af32f 100644 --- a/src/logging_config/config.json +++ b/src/logging_config/config.json @@ -3,12 +3,12 @@ "disable_existing_loggers": false, "formatters": { "simple": { - "format": " %(asctime)s - %(levelname)s - %(message)s", - "datefmt": "%Y-%m-%dT%H:%M:%S%z" + "format": "%(asctime)s │ %(levelname)-7s │ %(name)-17s │ %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S" }, "detailed": { - "format": " %(asctime)s - %(levelname)s - [%(module)s|L%(lineno)d] - %(message)s", - "datefmt": "%Y-%m-%dT%H:%M:%S%z" + "format": "%(asctime)s │ %(levelname)-7s │ [%(module)21s:%(lineno)4d] │ %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S" } }, "filters": { diff --git a/src/warnet/server.py b/src/warnet/server.py index 8df352c46..bc77a10a5 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -95,16 +95,10 @@ def healthy(self): def setup_logging(self): os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True) - with open(LOGGING_CONFIG_PATH) as f: logging_config = json.load(f) - - # Update log file path logging_config["handlers"]["file"]["filename"] = str(self.log_file_path) - - # Apply the config logging.config.dictConfig(logging_config) - self.logger = logging.getLogger("warnet") self.logger.info("Logging started") diff --git a/src/warnet/services.py b/src/warnet/services.py index 0b95d8c68..562813432 100644 --- a/src/warnet/services.py +++ b/src/warnet/services.py @@ -12,7 +12,7 @@ class ServiceType(Enum): CIRCUITBREAKER = 3 -services = { +SERVICES = { # "forkobserver": { # "image": "b10c/fork-observer:latest", # "container_name_suffix": "fork-observer", diff --git a/src/warnet/test_framework_bridge.py b/src/warnet/test_framework_bridge.py index 3897571e4..02cd32a95 100644 --- a/src/warnet/test_framework_bridge.py +++ b/src/warnet/test_framework_bridge.py @@ -48,16 +48,14 @@ def handle_sigterm(self, signum, frame): def setup(self): signal.signal(signal.SIGTERM, self.handle_sigterm) + + # Must setuup warnet first to avoid double formatting + self.warnet = Warnet.from_network(self.options.network) # hacked from _start_logging() # Scenarios will log plain messages to stdout only, which will can redirected by warnet - self.log = logging.getLogger() + self.log = logging.getLogger("WarnetTestFramework") self.log.setLevel(logging.INFO) # set this to DEBUG to see ALL RPC CALLS - ch = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter(fmt="%(message)s") - ch.setFormatter(formatter) - self.log.addHandler(ch) - self.warnet = Warnet.from_network(self.options.network) for i, tank in enumerate(self.warnet.tanks): ip = tank.ipv4 self.log.info(f"Adding TestNode {i} from tank {tank.index} with IP {ip}") diff --git a/test/build_branch_test.py b/test/build_branch_test.py index 0a2773bd6..f239e9811 100755 --- a/test/build_branch_test.py +++ b/test/build_branch_test.py @@ -6,30 +6,52 @@ from test_base import TestBase -graph_file_path = Path(os.path.dirname(__file__)) / "data" / "build_v24_test.graphml" -base = TestBase() -base.start_server() -print(base.warcli(f"network start {graph_file_path}")) -base.wait_for_all_tanks_status(target="running", timeout=10 * 60) -base.wait_for_all_edges() - -print("\nWait for p2p connections") - - -def check_peers(): - info0 = json.loads(base.warcli("rpc 0 getpeerinfo")) - info1 = json.loads(base.warcli("rpc 1 getpeerinfo")) - print(f"Waiting for both nodes to get one peer: node0: {len(info0)}, node1: {len(info1)}") - return len(info0) == 1 and len(info1) == 1 - - -base.wait_for_predicate(check_peers) - -print("\nCheck build flags were processed") -release_help = base.get_tank(0).exec("bitcoind -h") -build_help = base.get_tank(1).exec("bitcoind -h") -assert "zmqpubhashblock" in release_help -assert "zmqpubhashblock" not in build_help - -base.stop_server() +class BuildBranchTest(TestBase): + def __init__(self): + super().__init__() + self.graph_file_path = Path(os.path.dirname(__file__)) / "data" / "build_v24_test.graphml" + + def run_test(self): + self.start_server() + try: + self.setup_network() + self.wait_for_p2p_connections() + self.check_build_flags() + finally: + self.stop_server() + + def setup_network(self): + self.log.info("Setting up network") + self.log.info(self.warcli(f"network start {self.graph_file_path}")) + self.wait_for_all_tanks_status(target="running", timeout=10 * 60) + self.wait_for_all_edges() + + def wait_for_p2p_connections(self): + self.log.info("Waiting for P2P connections") + self.wait_for_predicate(self.check_peers, timeout=5 * 60) + + def check_peers(self): + info0 = json.loads(self.warcli("rpc 0 getpeerinfo")) + info1 = json.loads(self.warcli("rpc 1 getpeerinfo")) + self.log.debug( + f"Waiting for both nodes to get one peer: node0: {len(info0)}, node1: {len(info1)}" + ) + return len(info0) == 1 and len(info1) == 1 + + def check_build_flags(self): + self.log.info("Checking build flags") + release_help = self.get_tank(0).exec("bitcoind -h") + build_help = self.get_tank(1).exec("bitcoind -h") + + assert "zmqpubhashblock" in release_help, "zmqpubhashblock not found in release help" + assert ( + "zmqpubhashblock" not in build_help + ), "zmqpubhashblock found in build help, but it shouldn't be" + + self.log.info("Build flags check passed") + + +if __name__ == "__main__": + test = BuildBranchTest() + test.run_test() diff --git a/test/dag_connection_test.py b/test/dag_connection_test.py index 8153fb699..feecbf290 100755 --- a/test/dag_connection_test.py +++ b/test/dag_connection_test.py @@ -6,29 +6,58 @@ from test_base import TestBase -graph_file_path = Path(os.path.dirname(__file__)) / "data" / "ten_semi_unconnected.graphml" - -base = TestBase() - -base.start_server() -print(base.warcli(f"network start {graph_file_path}")) -base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() - -# Start scenario -base.warcli("scenarios run-file test/framework_tests/connect_dag.py") - -counter = 0 -seconds = 180 -while ( - len(base.rpc("scenarios_list_running")) == 1 and base.rpc("scenarios_list_running")[0]["active"] -): - time.sleep(1) - counter += 1 - if counter > seconds: - pid = base.rpc("scenarios_list_running")[0]["pid"] - base.warcli(f"scenarios stop {pid}", False) - print(f"{os.path.basename(__file__)} more than {seconds} seconds") - assert counter < seconds - -base.stop_server() + +class DAGConnectionTest(TestBase): + def __init__(self): + super().__init__() + self.graph_file_path = ( + Path(os.path.dirname(__file__)) / "data" / "ten_semi_unconnected.graphml" + ) + self.scenario_timeout = 180 # seconds + + def run_test(self): + self.start_server() + try: + self.setup_network() + self.run_connect_dag_scenario() + finally: + self.stop_server() + + def setup_network(self): + self.log.info("Setting up network") + self.log.info(self.warcli(f"network start {self.graph_file_path}")) + self.wait_for_all_tanks_status(target="running") + self.wait_for_all_edges() + + def run_connect_dag_scenario(self): + self.log.info("Running connect_dag scenario") + self.warcli("scenarios run-file test/framework_tests/connect_dag.py") + + start_time = time.time() + while time.time() - start_time < self.scenario_timeout: + running_scenarios = self.rpc("scenarios_list_running") + if not running_scenarios: + self.log.info("Scenario completed successfully") + return + + if len(running_scenarios) == 1 and not running_scenarios[0]["active"]: + self.log.info("Scenario completed successfully") + return + + time.sleep(1) + + self.log.error(f"Scenario did not complete within {self.scenario_timeout} seconds") + self.stop_running_scenario() + raise AssertionError(f"Scenario timed out after {self.scenario_timeout} seconds") + + def stop_running_scenario(self): + running_scenarios = self.rpc("scenarios_list_running") + if running_scenarios: + pid = running_scenarios[0]["pid"] + self.log.warning(f"Stopping scenario with PID {pid}") + self.warcli(f"scenarios stop {pid}", False) + + +if __name__ == "__main__": + test = DAGConnectionTest() + test.run_test() diff --git a/test/data/services.graphml b/test/data/services.graphml index b3195ad6a..7b7ffab7b 100644 --- a/test/data/services.graphml +++ b/test/data/services.graphml @@ -15,7 +15,7 @@ - cadvisor forkobserver addrmanobserver grafana nodeexporter prometheus + 27.0 -uacomment=w0 -debug=validation diff --git a/test/ln_test.py b/test/ln_test.py index 672d0f332..cbbbf990c 100755 --- a/test/ln_test.py +++ b/test/ln_test.py @@ -7,99 +7,118 @@ from test_base import TestBase from warnet.services import ServiceType -graph_file_path = Path(os.path.dirname(__file__)) / "data" / "ln.graphml" -base = TestBase() -base.start_server() - - -def get_cb_forwards(index): - cmd = "wget -q -O - 127.0.0.1:9235/api/forwarding_history" - res = base.wait_for_rpc( - "exec_run", [index, ServiceType.CIRCUITBREAKER.value, cmd, base.network_name] - ) - return json.loads(res) - - -print(base.warcli(f"network start {graph_file_path}")) -base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() - -print("\nRunning LN Init scenario") -base.warcli("rpc 0 getblockcount") -base.warcli("scenarios run ln_init") -base.wait_for_all_scenarios() - -node2pub, node2host = json.loads(base.warcli("lncli 2 getinfo"))["uris"][0].split("@") - -print("\nEnsuring node-level channel policy settings") -chan_id = json.loads(base.warcli("lncli 2 listchannels"))["channels"][0]["chan_id"] -chan = json.loads(base.warcli(f"lncli 2 getchaninfo {chan_id}")) -# node_1 or node_2 is tank 2 with its non-default --bitcoin.timelockdelta=33 -if chan["node1_policy"]["time_lock_delta"] != 33: - assert chan["node2_policy"]["time_lock_delta"] == 33 - -print("\nEnsuring no circuit breaker forwards yet") -assert len(get_cb_forwards(1)["forwards"]) == 0 - -print("\nTest LN payment from 0 -> 2") -inv = json.loads(base.warcli("lncli 2 addinvoice --amt=2000"))["payment_request"] - -print(f"\nGot invoice from node 2: {inv}") -print("\nPaying invoice from node 0...") -print(base.warcli(f"lncli 0 payinvoice -f {inv}")) - -print("Waiting for payment success") - - -def check_invoices(): - invs = json.loads(base.warcli("lncli 2 listinvoices"))["invoices"] - if len(invs) > 0 and invs[0]["state"] == "SETTLED": - print("\nSettled!") - return True - else: +class LNTest(TestBase): + def __init__(self): + super().__init__() + self.graph_file_path = Path(os.path.dirname(__file__)) / "data" / "ln.graphml" + + def run_test(self): + self.start_server() + try: + self.setup_network() + self.run_ln_init_scenario() + self.test_channel_policies() + self.test_ln_payment_0_to_2() + self.test_ln_payment_2_to_0() + self.test_simln() + finally: + self.stop_server() + + def setup_network(self): + self.log.info("Setting up network") + self.log.info(self.warcli(f"network start {self.graph_file_path}")) + self.wait_for_all_tanks_status(target="running") + self.wait_for_all_edges() + + def get_cb_forwards(self, index): + cmd = "wget -q -O - 127.0.0.1:9235/api/forwarding_history" + res = self.wait_for_rpc( + "exec_run", [index, ServiceType.CIRCUITBREAKER.value, cmd, self.network_name] + ) + return json.loads(res) + + def run_ln_init_scenario(self): + self.log.info("Running LN Init scenario") + self.warcli("rpc 0 getblockcount") + self.warcli("scenarios run ln_init") + self.wait_for_all_scenarios() + + def test_channel_policies(self): + self.log.info("Ensuring node-level channel policy settings") + node2pub, node2host = json.loads(self.warcli("lncli 2 getinfo"))["uris"][0].split("@") + chan_id = json.loads(self.warcli("lncli 2 listchannels"))["channels"][0]["chan_id"] + chan = json.loads(self.warcli(f"lncli 2 getchaninfo {chan_id}")) + + # node_1 or node_2 is tank 2 with its non-default --bitcoin.timelockdelta=33 + if chan["node1_policy"]["time_lock_delta"] != 33: + assert ( + chan["node2_policy"]["time_lock_delta"] == 33 + ), "Expected time_lock_delta to be 33" + + self.log.info("Ensuring no circuit breaker forwards yet") + assert len(self.get_cb_forwards(1)["forwards"]) == 0, "Expected no circuit breaker forwards" + + def test_ln_payment_0_to_2(self): + self.log.info("Test LN payment from 0 -> 2") + inv = json.loads(self.warcli("lncli 2 addinvoice --amt=2000"))["payment_request"] + self.log.info(f"Got invoice from node 2: {inv}") + self.log.info("Paying invoice from node 0...") + self.log.info(self.warcli(f"lncli 0 payinvoice -f {inv}")) + + self.wait_for_predicate(self.check_invoice_settled) + + self.log.info("Ensuring channel-level channel policy settings: source") + payment = json.loads(self.warcli("lncli 0 listpayments"))["payments"][0] + assert ( + payment["fee_msat"] == "5506" + ), f"Expected fee_msat to be 5506, got {payment['fee_msat']}" + + self.log.info("Ensuring circuit breaker tracked payment") + assert len(self.get_cb_forwards(1)["forwards"]) == 1, "Expected one circuit breaker forward" + + def test_ln_payment_2_to_0(self): + self.log.info("Test LN payment from 2 -> 0") + inv = json.loads(self.warcli("lncli 0 addinvoice --amt=1000"))["payment_request"] + self.log.info(f"Got invoice from node 0: {inv}") + self.log.info("Paying invoice from node 2...") + self.log.info(self.warcli(f"lncli 2 payinvoice -f {inv}")) + + self.wait_for_predicate(lambda: self.check_invoices(0) == 1) + + self.log.info("Ensuring channel-level channel policy settings: target") + payment = json.loads(self.warcli("lncli 2 listpayments"))["payments"][0] + assert ( + payment["fee_msat"] == "2213" + ), f"Expected fee_msat to be 2213, got {payment['fee_msat']}" + + def test_simln(self): + self.log.info("Engaging simln") + node2pub, _ = json.loads(self.warcli("lncli 2 getinfo"))["uris"][0].split("@") + activity = [ + {"source": "ln-0", "destination": node2pub, "interval_secs": 1, "amount_msat": 2000} + ] + self.warcli( + f"network export --exclude=[1] --activity={json.dumps(activity).replace(' ', '')}" + ) + self.wait_for_predicate(lambda: self.check_invoices(2) > 1) + assert self.check_invoices(0) == 1, "Expected one invoice for node 0" + assert self.check_invoices(1) == 0, "Expected no invoices for node 1" + + def check_invoice_settled(self): + invs = json.loads(self.warcli("lncli 2 listinvoices"))["invoices"] + if len(invs) > 0 and invs[0]["state"] == "SETTLED": + self.log.info("Invoice settled") + return True return False + def check_invoices(self, index): + invs = json.loads(self.warcli(f"lncli {index} listinvoices"))["invoices"] + settled = sum(1 for inv in invs if inv["state"] == "SETTLED") + self.log.debug(f"Node {index} has {settled} settled invoices") + return settled -base.wait_for_predicate(check_invoices) - -print("\nEnsuring channel-level channel policy settings: source") -payment = json.loads(base.warcli("lncli 0 listpayments"))["payments"][0] -assert payment["fee_msat"] == "5506" - -print("\nEnsuring circuit breaker tracked payment") -assert len(get_cb_forwards(1)["forwards"]) == 1 - -print("\nTest LN payment from 2 -> 0") -inv = json.loads(base.warcli("lncli 0 addinvoice --amt=1000"))["payment_request"] - -print(f"\nGot invoice from node 0: {inv}") -print("\nPaying invoice from node 2...") -print(base.warcli(f"lncli 2 payinvoice -f {inv}")) - -print("Waiting for payment success") - - -def check_invoices(index): - invs = json.loads(base.warcli(f"lncli {index} listinvoices"))["invoices"] - settled = 0 - for inv in invs: - if inv["state"] == "SETTLED": - settled += 1 - return settled - - -base.wait_for_predicate(lambda: check_invoices(0) == 1) - -print("\nEnsuring channel-level channel policy settings: target") -payment = json.loads(base.warcli("lncli 2 listpayments"))["payments"][0] -assert payment["fee_msat"] == "2213" - -print("\nEngaging simln") -activity = [{"source": "ln-0", "destination": node2pub, "interval_secs": 1, "amount_msat": 2000}] -base.warcli(f"network export --exclude=[1] --activity={json.dumps(activity).replace(' ', '')}") -base.wait_for_predicate(lambda: check_invoices(2) > 1) -assert check_invoices(0) == 1 -assert check_invoices(1) == 0 -base.stop_server() +if __name__ == "__main__": + test = LNTest() + test.run_test() diff --git a/test/onion_test.py b/test/onion_test.py new file mode 100755 index 000000000..250fc4c85 --- /dev/null +++ b/test/onion_test.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 + +import json +import os +from pathlib import Path + +from test_base import TestBase + + +class OnionTest(TestBase): + def __init__(self): + super().__init__() + self.graph_file_path = Path(os.path.dirname(__file__)) / "data" / "12_node_ring.graphml" + self.onion_addr = None + + def run_test(self): + self.start_server() + try: + self.setup_network() + self.test_reachability() + self.test_onion_peer_connection() + finally: + self.stop_server() + + def setup_network(self): + self.log.info("Setting up network") + self.log.info(self.warcli(f"network start {self.graph_file_path}")) + self.wait_for_all_tanks_status(target="running") + self.wait_for_all_edges() + + def test_reachability(self): + self.log.info("Checking IPv4 and onion reachability") + self.wait_for_predicate(self.check_reachability, timeout=10 * 60) + + def check_reachability(self): + try: + info = json.loads(self.warcli("rpc 0 getnetworkinfo")) + for net in info["networks"]: + if net["name"] == "ipv4" and not net["reachable"]: + return False + if net["name"] == "onion" and not net["reachable"]: + return False + if len(info["localaddresses"]) != 2: + return False + for addr in info["localaddresses"]: + assert "100." in addr["address"] or ".onion" in addr["address"] + if ".onion" in addr["address"]: + self.onion_addr = addr["address"] + return True + except Exception as e: + self.log.error(f"Error checking reachability: {e}") + return False + + def test_onion_peer_connection(self): + self.log.info("Attempting addnode to onion peer") + self.warcli(f"rpc 1 addnode {self.onion_addr} add") + # Might take up to 10 minutes + self.wait_for_predicate(self.check_onion_peer, timeout=10 * 60) + + def check_onion_peer(self): + peers = json.loads(self.warcli("rpc 0 getpeerinfo")) + for peer in peers: + self.log.debug(f"Checking peer: {peer['network']} {peer['addr']}") + if peer["network"] == "onion": + return True + return False + + +if __name__ == "__main__": + test = OnionTest() + test.run_test() diff --git a/test/rpc_test.py b/test/rpc_test.py index 14492cdc7..133415b45 100755 --- a/test/rpc_test.py +++ b/test/rpc_test.py @@ -6,51 +6,70 @@ from test_base import TestBase -graph_file_path = Path(os.path.dirname(__file__)) / "data" / "12_node_ring.graphml" -base = TestBase() -base.start_server() -print(base.warcli(f"network start {graph_file_path}")) -base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() - -# Exponential backoff will repeat this command until it succeeds. -# That's when we are ready for commands -base.warcli("rpc 0 getblockcount") - -# Fund wallet -base.warcli("rpc 1 createwallet miner") -base.warcli("rpc 1 -generate 101") - -base.wait_for_predicate(lambda: "101" in base.warcli("rpc 0 getblockcount")) - -txid = base.warcli("rpc 1 sendtoaddress bcrt1qthmht0k2qnh3wy7336z05lu2km7emzfpm3wg46 0.1") - -base.wait_for_predicate(lambda: txid in base.warcli("rpc 0 getrawmempool")) - -node_log = base.warcli("debug-log 1") -assert txid in node_log - -all_logs = base.warcli(f"grep-logs {txid}") -count = all_logs.count("Enqueuing TransactionAddedToMempool") -# should be at least more than one node -assert count > 1 - -msgs = base.warcli("messages 0 1") -assert "verack" in msgs - - -def got_addrs(): - addrman = json.loads(base.warcli("rpc 0 getrawaddrman")) - for key in ["tried", "new"]: - obj = addrman[key] - keys = list(obj.keys()) - groups = [g.split("/")[0] for g in keys] - if len(set(groups)) > 1: - return True - return False - - -base.wait_for_predicate(got_addrs) - -base.stop_server() +class RPCTest(TestBase): + def __init__(self): + super().__init__() + self.graph_file_path = Path(os.path.dirname(__file__)) / "data" / "12_node_ring.graphml" + + def run_test(self): + self.start_server() + try: + self.setup_network() + self.test_rpc_commands() + self.test_transaction_propagation() + self.test_message_exchange() + self.test_address_manager() + finally: + self.stop_server() + + def setup_network(self): + self.log.info("Setting up network") + self.log.info(self.warcli(f"network start {self.graph_file_path}")) + self.wait_for_all_tanks_status(target="running") + self.wait_for_all_edges() + + def test_rpc_commands(self): + self.log.info("Testing basic RPC commands") + self.warcli("rpc 0 getblockcount") + self.warcli("rpc 1 createwallet miner") + self.warcli("rpc 1 -generate 101") + self.wait_for_predicate(lambda: "101" in self.warcli("rpc 0 getblockcount")) + + def test_transaction_propagation(self): + self.log.info("Testing transaction propagation") + address = "bcrt1qthmht0k2qnh3wy7336z05lu2km7emzfpm3wg46" + txid = self.warcli(f"rpc 1 sendtoaddress {address} 0.1") + self.wait_for_predicate(lambda: txid in self.warcli("rpc 0 getrawmempool")) + + node_log = self.warcli("debug-log 1") + assert txid in node_log, "Transaction ID not found in node log" + + all_logs = self.warcli(f"grep-logs {txid}") + count = all_logs.count("Enqueuing TransactionAddedToMempool") + assert count > 1, f"Transaction not propagated to enough nodes (count: {count})" + + def test_message_exchange(self): + self.log.info("Testing message exchange between nodes") + msgs = self.warcli("messages 0 1") + assert "verack" in msgs, "VERACK message not found in exchange" + + def test_address_manager(self): + self.log.info("Testing address manager") + + def got_addrs(): + addrman = json.loads(self.warcli("rpc 0 getrawaddrman")) + for key in ["tried", "new"]: + obj = addrman[key] + keys = list(obj.keys()) + groups = [g.split("/")[0] for g in keys] + if len(set(groups)) > 1: + return True + return False + + self.wait_for_predicate(got_addrs) + + +if __name__ == "__main__": + test = RPCTest() + test.run_test() diff --git a/test/scenarios_test.py b/test/scenarios_test.py index 445da2615..9c6af559c 100755 --- a/test/scenarios_test.py +++ b/test/scenarios_test.py @@ -5,48 +5,73 @@ from test_base import TestBase -graph_file_path = Path(os.path.dirname(__file__)) / "data" / "12_node_ring.graphml" -base = TestBase() -base.start_server() -print(base.warcli(f"network start {graph_file_path}")) -base.wait_for_all_tanks_status(target="running") - -# Use rpc instead of warcli so we get raw JSON object -scenarios = base.rpc("scenarios_available") -assert len(scenarios) == 4 - -# Start scenario -base.warcli("scenarios run miner_std --allnodes --interval=1") - - -def check_blocks(): - # Ensure the scenario is still working - running = base.rpc("scenarios_list_running") - assert len(running) == 1 - assert running[0]["active"] - assert "miner_std" in running[0]["cmd"] - - count = int(base.warcli("rpc 0 getblockcount")) - print(f"Waiting for 30 blocks: {count}") - return count >= 30 - - -base.wait_for_predicate(check_blocks) - -# Stop scenario -running = base.rpc("scenarios_list_running") -assert len(running) == 1 -assert running[0]["active"] -base.warcli(f"scenarios stop {running[0]['pid']}", False) - - -def check_stop(): - running = base.rpc("scenarios_list_running") - print(f"Waiting for scenario to stop: {running}") - return len(running) == 0 - - -base.wait_for_predicate(check_stop) - -base.stop_server() +class ScenariosTest(TestBase): + def __init__(self): + super().__init__() + self.graph_file_path = Path(os.path.dirname(__file__)) / "data" / "12_node_ring.graphml" + + def run_test(self): + try: + self.start_server() + self.setup_network() + self.test_scenarios() + finally: + self.stop_server() + + def setup_network(self): + self.log.info("Setting up network") + self.log.info(self.warcli(f"network start {self.graph_file_path}")) + self.wait_for_all_tanks_status(target="running") + + def test_scenarios(self): + self.check_available_scenarios() + self.run_and_check_scenario("miner_std") + self.run_and_check_scenario_from_file("src/scenarios/miner_std.py") + + def check_available_scenarios(self): + self.log.info("Checking available scenarios") + # Use rpc instead of warcli so we get raw JSON object + scenarios = self.rpc("scenarios_available") + assert len(scenarios) == 4, f"Expected 4 available scenarios, got {len(scenarios)}" + self.log.info(f"Found {len(scenarios)} available scenarios") + + def run_and_check_scenario(self, scenario_name): + self.log.info(f"Running scenario: {scenario_name}") + self.warcli(f"scenarios run {scenario_name} --allnodes --interval=1") + self.wait_for_predicate(lambda: self.check_blocks(30)) + self.stop_scenario() + + def run_and_check_scenario_from_file(self, scenario_file): + self.log.info(f"Running scenario from file: {scenario_file}") + self.warcli(f"scenarios run-file {scenario_file} --allnodes --interval=1") + start = int(self.warcli("rpc 0 getblockcount")) + self.wait_for_predicate(lambda: self.check_blocks(2, start=start)) + self.stop_scenario() + + def check_blocks(self, target_blocks, start: int = 0): + running = self.rpc("scenarios_list_running") + assert len(running) == 1, f"Expected one running scenario, got {len(running)}" + assert running[0]["active"], "Scenario should be active" + + count = int(self.warcli("rpc 0 getblockcount")) + self.log.debug(f"Current block count: {count}, target: {start + target_blocks}") + return count >= start + target_blocks + + def stop_scenario(self): + self.log.info("Stopping running scenario") + running = self.rpc("scenarios_list_running") + assert len(running) == 1, f"Expected one running scenario, got {len(running)}" + assert running[0]["active"], "Scenario should be active" + self.warcli(f"scenarios stop {running[0]['pid']}", False) + self.wait_for_predicate(self.check_scenario_stopped) + + def check_scenario_stopped(self): + running = self.rpc("scenarios_list_running") + self.log.debug(f"Checking if scenario stopped. Running scenarios: {len(running)}") + return len(running) == 0 + + +if __name__ == "__main__": + test = ScenariosTest() + test.run_test() diff --git a/test/test_base.py b/test/test_base.py index 54032ec7c..15ce35fff 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -1,4 +1,7 @@ import atexit +import json +import logging +import logging.config import os import threading from pathlib import Path @@ -7,41 +10,48 @@ from time import sleep from cli.rpc import rpc_call +from warnet.server import LOGGING_CONFIG_PATH from warnet.utils import exponential_backoff from warnet.warnet import Warnet class TestBase: def __init__(self): - # Warnet server stdout gets logged here - self.tmpdir = Path(mkdtemp(prefix="warnet-test-")) - os.environ["XDG_STATE_HOME"] = f"{self.tmpdir}" - self.logfilepath = self.tmpdir / "warnet" / "warnet.log" + self.setup_environment() + self.setup_logging() + atexit.register(self.cleanup) + self.log.info("Warnet test base initialized") + def setup_environment(self): + self.tmpdir = Path(mkdtemp(prefix="warnet-test-")) + os.environ["XDG_STATE_HOME"] = str(self.tmpdir) + self.logfilepath = self.tmpdir / "warnet.log" # Use the same dir name for the warnet network name # replacing underscores which throws off k8s self.network_name = self.tmpdir.name.replace("_", "") - self.server = None self.server_thread = None self.stop_threads = threading.Event() self.network = True - atexit.register(self.cleanup) - - print("\nWarnet test base started") + def setup_logging(self): + with open(LOGGING_CONFIG_PATH) as f: + logging_config = json.load(f) + logging_config["handlers"]["file"]["filename"] = str(self.logfilepath) + logging.config.dictConfig(logging_config) + self.log = logging.getLogger("TestFramework") + self.log.info("Logging started") def cleanup(self, signum=None, frame=None): if self.server is None: return - try: - print("\nStopping network") + self.log.info("Stopping network") if self.network: self.warcli("network down") self.wait_for_all_tanks_status(target="stopped", timeout=60, interval=1) except Exception as e: - print(f"Error bringing network down: {e}") + self.log.error(f"Error bringing network down: {e}") finally: self.stop_threads.set() self.server.terminate() @@ -49,35 +59,35 @@ def cleanup(self, signum=None, frame=None): self.server_thread.join() self.server = None - # Execute a warcli RPC using command line (always returns string) - def warcli(self, str, network=True): - cmd = ["warcli"] + str.split() + def warcli(self, cmd, network=True): + self.log.debug(f"Executing warcli command: {cmd}") + command = ["warcli"] + cmd.split() if network: - cmd += ["--network", self.network_name] - proc = run(cmd, capture_output=True) - + command += ["--network", self.network_name] + proc = run(command, capture_output=True) if proc.stderr: raise Exception(proc.stderr.decode().strip()) return proc.stdout.decode().strip() - # Execute a warnet RPC API call directly (may return dict or list) - def rpc(self, method, params=None): + def rpc(self, method, params=None) -> dict | list: + """Execute a warnet RPC API call directly""" + self.log.debug(f"Executing RPC method: {method}") return rpc_call(method, params) - # Repeatedly execute an RPC until it succeeds @exponential_backoff(max_retries=20) def wait_for_rpc(self, method, params=None): - return rpc_call(method, params) + """Repeatedly execute an RPC until it succeeds""" + return self.rpc(method, params) - # Read output from server using a thread def output_reader(self, pipe, func): while not self.stop_threads.is_set(): line = pipe.readline().strip() if line: func(line) - # Start the Warnet server and wait for RPC interface to respond def start_server(self): + """Start the Warnet server and wait for RPC interface to respond""" + if self.server is not None: raise Exception("Server is already running") @@ -86,6 +96,7 @@ def start_server(self): # For kubernetes we assume the server is started outside test base # but we can still read its log output + self.log.info("Starting Warnet server") self.server = Popen( ["kubectl", "logs", "-f", "rpc-0"], stdout=PIPE, @@ -94,67 +105,59 @@ def start_server(self): universal_newlines=True, ) - # Create a thread to read the output self.server_thread = threading.Thread( target=self.output_reader, args=(self.server.stdout, print) ) self.server_thread.daemon = True self.server_thread.start() - # doesn't require anything container-related - print("\nWaiting for RPC") + self.log.info("Waiting for RPC") self.wait_for_rpc("scenarios_available") - # Quit def stop_server(self): self.cleanup() def wait_for_predicate(self, predicate, timeout=5 * 60, interval=5): - while True: + self.log.debug(f"Waiting for predicate with timeout {timeout}s and interval {interval}s") + while timeout > 0: if predicate(): - break + return sleep(interval) timeout -= interval - if timeout < 0: - raise Exception("Timed out waiting for predicate Truth") + import inspect + + raise Exception( + f"Timed out waiting for Truth from predicate: {inspect.getsource(predicate).strip()}" + ) def get_tank(self, index): wn = Warnet.from_network(self.network_name) return wn.tanks[index] - # Poll the warnet server for container status - # Block until all tanks are running def wait_for_all_tanks_status(self, target="running", timeout=20 * 60, interval=5): + """Poll the warnet server for container status + Block until all tanks are running + """ + def check_status(): tanks = self.wait_for_rpc("network_status", {"network": self.network_name}) stats = {"total": 0} for tank in tanks: - stats["total"] += 1 - bitcoin_status = tank["bitcoin_status"] - if bitcoin_status not in stats: - stats[bitcoin_status] = 0 - stats[bitcoin_status] += 1 - if "lightning_status" in tank: - stats["total"] += 1 - lightning_status = tank["lightning_status"] - if lightning_status not in stats: - stats[lightning_status] = 0 - stats[lightning_status] += 1 - if "circuitbreaker_status" in tank: - stats["total"] += 1 - circuitbreaker_status = tank["circuitbreaker_status"] - if circuitbreaker_status not in stats: - stats[circuitbreaker_status] = 0 - stats[circuitbreaker_status] += 1 - print(f"Waiting for all tanks to reach '{target}': {stats}") - # All tanks are running, proceed + for service in ["bitcoin", "lightning", "circuitbreaker"]: + status = tank.get(f"{service}_status") + if status: + stats["total"] += 1 + stats[status] = stats.get(status, 0) + 1 + self.log.info(f"Waiting for all tanks to reach '{target}': {stats}") return target in stats and stats[target] == stats["total"] self.wait_for_predicate(check_status, timeout, interval) - # Ensure all tanks have all the connections they are supposed to have - # Block until all success def wait_for_all_edges(self, timeout=20 * 60, interval=5): + """Ensure all tanks have all the connections they are supposed to have + Block until all success + """ + def check_status(): return self.wait_for_rpc("network_connected", {"network": self.network_name}) diff --git a/test/v25_net_test.py b/test/v25_net_test.py deleted file mode 100755 index 52295c09a..000000000 --- a/test/v25_net_test.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python3 - -import json -import os -from pathlib import Path - -from test_base import TestBase - -graph_file_path = Path(os.path.dirname(__file__)) / "data" / "12_node_ring.graphml" - -base = TestBase() -base.start_server() -print(base.warcli(f"network start {graph_file_path}")) -base.wait_for_all_tanks_status(target="running") -base.wait_for_all_edges() - -onion_addr = None - - -def wait_for_reachability(): - try: - global onion_addr - info = json.loads(base.warcli("rpc 0 getnetworkinfo")) - for net in info["networks"]: - if net["name"] == "ipv4" and not net["reachable"]: - return False - if net["name"] == "onion" and not net["reachable"]: - return False - if len(info["localaddresses"]) != 2: - return False - for addr in info["localaddresses"]: - assert "100." in addr["address"] or ".onion" in addr["address"] - if ".onion" in addr["address"]: - onion_addr = addr["address"] - return True - except Exception: - return False - - -print("\nChecking IPv4 and onion reachability") -base.wait_for_predicate(wait_for_reachability, timeout=10 * 60) - - -print("\nAttempting addnode to onion peer") -base.warcli(f"rpc 1 addnode {onion_addr} add") - - -def wait_for_onion_peer(): - peers = json.loads(base.warcli("rpc 0 getpeerinfo")) - for peer in peers: - print(f"Waiting for one onion peer: {peer['network']} {peer['addr']}") - if peer["network"] == "onion": - return True - return False - - -# Might take up to 10 minutes -base.wait_for_predicate(wait_for_onion_peer, timeout=10 * 60) - -base.stop_server()