From 50ac80d0420e3309af4deaa9a8a35c3d42147f04 Mon Sep 17 00:00:00 2001 From: willcl-ark Date: Mon, 8 Jul 2024 19:03:08 +0100 Subject: [PATCH 1/2] server: refactor proc_logger --- src/warnet/server.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/src/warnet/server.py b/src/warnet/server.py index bc77a10a5..4ba50100e 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -161,6 +161,12 @@ def setup_rpc(self): # Logs self.jsonrpc.register(self.logs_grep) + def proc_logger(self, proc): + while not proc.stdout: + time.sleep(0.1) + for line in proc.stdout: + self.logger.info(line.decode().rstrip()) + def get_warnet(self, network: str) -> Warnet: """ Will get a warnet from the cache if it exists. @@ -335,13 +341,7 @@ def scenarios_run_file( stderr=subprocess.PIPE, ) - def proc_logger(): - while not proc.stdout: - time.sleep(0.1) - for line in proc.stdout: - self.logger.info(line.decode().rstrip()) - - t = threading.Thread(target=lambda: proc_logger()) + t = threading.Thread(target=lambda: self.proc_logger(proc)) t.daemon = True t.start() @@ -380,13 +380,7 @@ def scenarios_run( stderr=subprocess.PIPE, ) - def proc_logger(): - while not proc.stdout: - time.sleep(0.1) - for line in proc.stdout: - self.logger.info(line.decode().rstrip()) - - t = threading.Thread(target=lambda: proc_logger()) + t = threading.Thread(target=lambda: self.proc_logger(proc)) t.daemon = True t.start() From ac4e55831389003a83d1e40e5fdc8b69c723af53 Mon Sep 17 00:00:00 2001 From: willcl-ark Date: Mon, 8 Jul 2024 09:04:21 +0100 Subject: [PATCH 2/2] scenarios: use filenames in listing when remote --- src/cli/scenarios.py | 9 ++++- src/warnet/server.py | 80 ++++++++++++++++-------------------------- test/scenarios_test.py | 9 +++++ 3 files changed, 47 insertions(+), 51 deletions(-) diff --git a/src/cli/scenarios.py b/src/cli/scenarios.py index 9d19e32f5..3ba5bd9b9 100644 --- a/src/cli/scenarios.py +++ b/src/cli/scenarios.py @@ -1,4 +1,5 @@ import base64 +import os import sys import click @@ -53,17 +54,23 @@ def run(scenario, network, additional_args): @scenarios.command(context_settings={"ignore_unknown_options": True}) @click.argument("scenario_path", type=str) @click.argument("additional_args", nargs=-1, type=click.UNPROCESSED) +@click.option("--name", type=str) @click.option("--network", default="warnet", show_default=True) -def run_file(scenario_path, network, additional_args): +def run_file(scenario_path, network, additional_args, name=""): """ Run from the Warnet Test Framework on [network] with optional arguments """ + if not scenario_path.endswith(".py"): + print("Error. Currently only python scenarios are supported") + sys.exit(1) + scenario_name = name if name else os.path.splitext(os.path.basename(scenario_path))[0] scenario_base64 = "" with open(scenario_path, "rb") as f: scenario_base64 = base64.b64encode(f.read()).decode("utf-8") params = { "scenario_base64": scenario_base64, + "scenario_name": scenario_name, "additional_args": additional_args, "network": network, } diff --git a/src/warnet/server.py b/src/warnet/server.py index 4ba50100e..2572e8ae8 100644 --- a/src/warnet/server.py +++ b/src/warnet/server.py @@ -316,51 +316,59 @@ def scenarios_available(self) -> list[tuple]: self.logger.error(msg) raise ServerError(message=msg) from e - def scenarios_run_file( - self, scenario_base64: str, additional_args: list[str], network: str = "warnet" + def _start_scenario( + self, + scenario_path: str, + scenario_name: str, + additional_args: list[str], + network: str, ) -> str: - scenario_path = None - with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_file: - scenario_path = temp_file.name - - # decode base64 string to binary - scenario_bytes = base64.b64decode(scenario_base64) - # write binary to file - temp_file.write(scenario_bytes) - - if not os.path.exists(scenario_path): - raise ServerError(f"Scenario not found at {scenario_path}.") - try: run_cmd = [sys.executable, scenario_path] + additional_args + [f"--network={network}"] self.logger.debug(f"Running {run_cmd}") - proc = subprocess.Popen( run_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - t = threading.Thread(target=lambda: self.proc_logger(proc)) t.daemon = True t.start() - self.running_scenarios.append( { "pid": proc.pid, - "cmd": f"{scenario_path} {' '.join(additional_args)}", + "cmd": f"{scenario_name} {' '.join(additional_args)}", "proc": proc, "network": network, } ) - - return f"Running scenario with PID {proc.pid} in the background..." - + return f"Running scenario {scenario_name} with PID {proc.pid} in the background..." except Exception as e: msg = f"Error running scenario: {e}" self.logger.error(msg) raise ServerError(message=msg) from e + def scenarios_run_file( + self, + scenario_base64: str, + scenario_name: str, + additional_args: list[str], + network: str = "warnet", + ) -> str: + # Extract just the filename without path and extension + with tempfile.NamedTemporaryFile( + prefix=scenario_name, + suffix=".py", + delete=False, + ) as temp_file: + scenario_path = temp_file.name + temp_file.write(base64.b64decode(scenario_base64)) + + if not os.path.exists(scenario_path): + raise ServerError(f"Scenario not found at {scenario_path}.") + + return self._start_scenario(scenario_path, scenario_name, additional_args, network) + def scenarios_run( self, scenario: str, additional_args: list[str], network: str = "warnet" ) -> str: @@ -370,35 +378,7 @@ def scenarios_run( if not os.path.exists(scenario_path): raise ServerError(f"Scenario {scenario} not found at {scenario_path}.") - try: - run_cmd = [sys.executable, scenario_path] + additional_args + [f"--network={network}"] - self.logger.debug(f"Running {run_cmd}") - - proc = subprocess.Popen( - run_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - t = threading.Thread(target=lambda: self.proc_logger(proc)) - t.daemon = True - t.start() - - self.running_scenarios.append( - { - "pid": proc.pid, - "cmd": f"{scenario} {' '.join(additional_args)}", - "proc": proc, - "network": network, - } - ) - - return f"Running scenario {scenario} with PID {proc.pid} in the background..." - - except Exception as e: - msg = f"Error running scenario: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e + return self._start_scenario(scenario_path, scenario, additional_args, network) def scenarios_stop(self, pid: int) -> str: matching_scenarios = [sc for sc in self.running_scenarios if sc["pid"] == pid] diff --git a/test/scenarios_test.py b/test/scenarios_test.py index 9c6af559c..5b1e56f67 100755 --- a/test/scenarios_test.py +++ b/test/scenarios_test.py @@ -36,9 +36,16 @@ def check_available_scenarios(self): assert len(scenarios) == 4, f"Expected 4 available scenarios, got {len(scenarios)}" self.log.info(f"Found {len(scenarios)} available scenarios") + def scenario_running(self, scenario_name: str): + """Check that we are only running a single scenario of the correct name""" + active = self.rpc("scenarios_list_running") + running = scenario_name in active[0]["cmd"] + return running and len(active) == 1 + def run_and_check_scenario(self, scenario_name): self.log.info(f"Running scenario: {scenario_name}") self.warcli(f"scenarios run {scenario_name} --allnodes --interval=1") + self.wait_for_predicate(lambda: self.scenario_running(scenario_name)) self.wait_for_predicate(lambda: self.check_blocks(30)) self.stop_scenario() @@ -46,6 +53,8 @@ def run_and_check_scenario_from_file(self, scenario_file): self.log.info(f"Running scenario from file: {scenario_file}") self.warcli(f"scenarios run-file {scenario_file} --allnodes --interval=1") start = int(self.warcli("rpc 0 getblockcount")) + scenario_name = os.path.splitext(os.path.basename(scenario_file))[0] + self.wait_for_predicate(lambda: self.scenario_running(scenario_name)) self.wait_for_predicate(lambda: self.check_blocks(2, start=start)) self.stop_scenario()