Skip to content
30 changes: 18 additions & 12 deletions src/backend/kubernetes_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from kubernetes.dynamic import DynamicClient
from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError
from kubernetes.stream import stream
from warnet.services import ServiceType, services
from warnet.services import SERVICES, ServiceType
from warnet.status import RunningStatus
from warnet.tank import Tank
from warnet.utils import parse_raw_messages
Expand Down Expand Up @@ -81,14 +81,17 @@ def down(self, warnet) -> bool:
self.remove_prometheus_service_monitors(warnet.tanks)

for service_name in warnet.services:
self.client.delete_namespaced_pod(
self.get_service_pod_name(services[service_name]["container_name_suffix"]),
self.namespace,
)
self.client.delete_namespaced_service(
self.get_service_service_name(services[service_name]["container_name_suffix"]),
self.namespace,
)
try:
self.client.delete_namespaced_pod(
self.get_service_pod_name(SERVICES[service_name]["container_name_suffix"]),
self.namespace,
)
self.client.delete_namespaced_service(
self.get_service_service_name(SERVICES[service_name]["container_name_suffix"]),
self.namespace,
)
except Exception as e:
self.log.error(f"Could not delete service: {service_name}:\n{e}")

return True

Expand Down Expand Up @@ -724,7 +727,10 @@ def deploy_pods(self, warnet):
self.apply_prometheus_service_monitors(warnet.tanks)

for service_name in warnet.services:
self.service_from_json(services[service_name])
try:
self.service_from_json(SERVICES[service_name])
except Exception as e:
self.log.error(f"Error starting service: {service_name}\n{e}")

self.log.debug("Containers and services created. Configuring IP addresses")
# now that the pods have had a second to create,
Expand All @@ -738,7 +744,7 @@ def deploy_pods(self, warnet):
pod_name = self.get_pod_name(tank.index, ServiceType.BITCOIN)
pod = self.get_pod(pod_name)
if pod is None or pod.status is None or getattr(pod.status, "pod_ip", None) is None:
print("Waiting for pod response or pod IP...")
self.log.info("Waiting for pod response or pod IP...")
time.sleep(3)
continue
pod_ip = pod.status.pod_ip
Expand Down Expand Up @@ -831,7 +837,7 @@ def service_from_json(self, obj):
self.client.create_namespaced_service(namespace=self.namespace, body=service_service)

def write_service_config(self, source_path: str, service_name: str, destination_path: str):
obj = services[service_name]
obj = SERVICES[service_name]
container_name = "sidecar"
# Copy the archive from our local drive (Warnet RPC container/pod)
# to the destination service's sidecar container via ssh
Expand Down
8 changes: 4 additions & 4 deletions src/logging_config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": " %(asctime)s - %(levelname)s - %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S%z"
"format": "%(asctime)s %(levelname)-7s │ %(name)-17s │ %(message)s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer to leave out | to save a bit more space, and limit (name) as well, i bet we can cap it at 8 chars and come up with short little nicknames for each module

"datefmt": "%Y-%m-%d %H:%M:%S"
},
"detailed": {
"format": " %(asctime)s - %(levelname)s - [%(module)s|L%(lineno)d] - %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S%z"
"format": "%(asctime)s %(levelname)-7s │ [%(module)21s:%(lineno)4d] │ %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S"
}
},
"filters": {
Expand Down
6 changes: 0 additions & 6 deletions src/warnet/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,10 @@ def healthy(self):

def setup_logging(self):
os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True)

with open(LOGGING_CONFIG_PATH) as f:
logging_config = json.load(f)

# Update log file path
logging_config["handlers"]["file"]["filename"] = str(self.log_file_path)

# Apply the config
logging.config.dictConfig(logging_config)

self.logger = logging.getLogger("warnet")
self.logger.info("Logging started")

Expand Down
2 changes: 1 addition & 1 deletion src/warnet/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class ServiceType(Enum):
CIRCUITBREAKER = 3


services = {
SERVICES = {
# "forkobserver": {
# "image": "b10c/fork-observer:latest",
# "container_name_suffix": "fork-observer",
Expand Down
10 changes: 4 additions & 6 deletions src/warnet/test_framework_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,14 @@ def handle_sigterm(self, signum, frame):

def setup(self):
signal.signal(signal.SIGTERM, self.handle_sigterm)

# Must setuup warnet first to avoid double formatting
Copy link
Contributor

@pinheadmz pinheadmz Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setuuuuuuup!

self.warnet = Warnet.from_network(self.options.network)
# hacked from _start_logging()
# Scenarios will log plain messages to stdout only, which will can redirected by warnet
self.log = logging.getLogger()
self.log = logging.getLogger("WarnetTestFramework")
self.log.setLevel(logging.INFO) # set this to DEBUG to see ALL RPC CALLS
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(fmt="%(message)s")
ch.setFormatter(formatter)
self.log.addHandler(ch)

self.warnet = Warnet.from_network(self.options.network)
for i, tank in enumerate(self.warnet.tanks):
ip = tank.ipv4
self.log.info(f"Adding TestNode {i} from tank {tank.index} with IP {ip}")
Expand Down
74 changes: 48 additions & 26 deletions test/build_branch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,52 @@

from test_base import TestBase

graph_file_path = Path(os.path.dirname(__file__)) / "data" / "build_v24_test.graphml"

base = TestBase()
base.start_server()
print(base.warcli(f"network start {graph_file_path}"))
base.wait_for_all_tanks_status(target="running", timeout=10 * 60)
base.wait_for_all_edges()

print("\nWait for p2p connections")


def check_peers():
info0 = json.loads(base.warcli("rpc 0 getpeerinfo"))
info1 = json.loads(base.warcli("rpc 1 getpeerinfo"))
print(f"Waiting for both nodes to get one peer: node0: {len(info0)}, node1: {len(info1)}")
return len(info0) == 1 and len(info1) == 1


base.wait_for_predicate(check_peers)

print("\nCheck build flags were processed")
release_help = base.get_tank(0).exec("bitcoind -h")
build_help = base.get_tank(1).exec("bitcoind -h")
assert "zmqpubhashblock" in release_help
assert "zmqpubhashblock" not in build_help

base.stop_server()
class BuildBranchTest(TestBase):
def __init__(self):
super().__init__()
self.graph_file_path = Path(os.path.dirname(__file__)) / "data" / "build_v24_test.graphml"

def run_test(self):
self.start_server()
try:
self.setup_network()
self.wait_for_p2p_connections()
self.check_build_flags()
finally:
self.stop_server()

def setup_network(self):
self.log.info("Setting up network")
self.log.info(self.warcli(f"network start {self.graph_file_path}"))
self.wait_for_all_tanks_status(target="running", timeout=10 * 60)
self.wait_for_all_edges()

def wait_for_p2p_connections(self):
self.log.info("Waiting for P2P connections")
self.wait_for_predicate(self.check_peers, timeout=5 * 60)

def check_peers(self):
info0 = json.loads(self.warcli("rpc 0 getpeerinfo"))
info1 = json.loads(self.warcli("rpc 1 getpeerinfo"))
self.log.debug(
f"Waiting for both nodes to get one peer: node0: {len(info0)}, node1: {len(info1)}"
)
return len(info0) == 1 and len(info1) == 1

def check_build_flags(self):
self.log.info("Checking build flags")
release_help = self.get_tank(0).exec("bitcoind -h")
build_help = self.get_tank(1).exec("bitcoind -h")

assert "zmqpubhashblock" in release_help, "zmqpubhashblock not found in release help"
assert (
"zmqpubhashblock" not in build_help
), "zmqpubhashblock found in build help, but it shouldn't be"

self.log.info("Build flags check passed")


if __name__ == "__main__":
test = BuildBranchTest()
test.run_test()
81 changes: 55 additions & 26 deletions test/dag_connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,58 @@

from test_base import TestBase

graph_file_path = Path(os.path.dirname(__file__)) / "data" / "ten_semi_unconnected.graphml"

base = TestBase()

base.start_server()
print(base.warcli(f"network start {graph_file_path}"))
base.wait_for_all_tanks_status(target="running")
base.wait_for_all_edges()

# Start scenario
base.warcli("scenarios run-file test/framework_tests/connect_dag.py")

counter = 0
seconds = 180
while (
len(base.rpc("scenarios_list_running")) == 1 and base.rpc("scenarios_list_running")[0]["active"]
):
time.sleep(1)
counter += 1
if counter > seconds:
pid = base.rpc("scenarios_list_running")[0]["pid"]
base.warcli(f"scenarios stop {pid}", False)
print(f"{os.path.basename(__file__)} more than {seconds} seconds")
assert counter < seconds

base.stop_server()

class DAGConnectionTest(TestBase):
def __init__(self):
super().__init__()
self.graph_file_path = (
Path(os.path.dirname(__file__)) / "data" / "ten_semi_unconnected.graphml"
)
self.scenario_timeout = 180 # seconds

def run_test(self):
self.start_server()
try:
self.setup_network()
self.run_connect_dag_scenario()
finally:
self.stop_server()

def setup_network(self):
self.log.info("Setting up network")
self.log.info(self.warcli(f"network start {self.graph_file_path}"))
self.wait_for_all_tanks_status(target="running")
self.wait_for_all_edges()

def run_connect_dag_scenario(self):
self.log.info("Running connect_dag scenario")
self.warcli("scenarios run-file test/framework_tests/connect_dag.py")

start_time = time.time()
while time.time() - start_time < self.scenario_timeout:
running_scenarios = self.rpc("scenarios_list_running")
if not running_scenarios:
self.log.info("Scenario completed successfully")
return

if len(running_scenarios) == 1 and not running_scenarios[0]["active"]:
self.log.info("Scenario completed successfully")
return

time.sleep(1)

self.log.error(f"Scenario did not complete within {self.scenario_timeout} seconds")
self.stop_running_scenario()
raise AssertionError(f"Scenario timed out after {self.scenario_timeout} seconds")

def stop_running_scenario(self):
running_scenarios = self.rpc("scenarios_list_running")
if running_scenarios:
pid = running_scenarios[0]["pid"]
self.log.warning(f"Stopping scenario with PID {pid}")
self.warcli(f"scenarios stop {pid}", False)


if __name__ == "__main__":
test = DAGConnectionTest()
test.run_test()
2 changes: 1 addition & 1 deletion test/data/services.graphml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<key id="source_policy" attr.name="source_policy" attr.type="string" for="edge" />
<key id="target_policy" attr.name="target_policy" attr.type="string" for="edge" />
<graph edgedefault="directed">
<data key="services">cadvisor forkobserver addrmanobserver grafana nodeexporter prometheus</data>
<!-- <data key="services">cadvisor forkobserver addrmanobserver grafana nodeexporter prometheus</data> -->
<node id="0">
<data key="version">27.0</data>
<data key="bitcoin_config">-uacomment=w0 -debug=validation</data>
Expand Down
Loading