Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/cli/scenarios.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import os
import sys

import click
Expand Down Expand Up @@ -53,17 +54,23 @@ def run(scenario, network, additional_args):
@scenarios.command(context_settings={"ignore_unknown_options": True})
@click.argument("scenario_path", type=str)
@click.argument("additional_args", nargs=-1, type=click.UNPROCESSED)
@click.option("--name", type=str)
@click.option("--network", default="warnet", show_default=True)
def run_file(scenario_path, network, additional_args):
def run_file(scenario_path, network, additional_args, name=""):
"""
Run <scenario_path> from the Warnet Test Framework on [network] with optional arguments
"""
if not scenario_path.endswith(".py"):
print("Error. Currently only python scenarios are supported")
sys.exit(1)
scenario_name = name if name else os.path.splitext(os.path.basename(scenario_path))[0]
scenario_base64 = ""
with open(scenario_path, "rb") as f:
scenario_base64 = base64.b64encode(f.read()).decode("utf-8")

params = {
"scenario_base64": scenario_base64,
"scenario_name": scenario_name,
"additional_args": additional_args,
"network": network,
}
Expand Down
100 changes: 37 additions & 63 deletions src/warnet/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ def setup_rpc(self):
# Logs
self.jsonrpc.register(self.logs_grep)

def proc_logger(self, proc):
while not proc.stdout:
time.sleep(0.1)
for line in proc.stdout:
self.logger.info(line.decode().rstrip())

def get_warnet(self, network: str) -> Warnet:
"""
Will get a warnet from the cache if it exists.
Expand Down Expand Up @@ -310,57 +316,59 @@ def scenarios_available(self) -> list[tuple]:
self.logger.error(msg)
raise ServerError(message=msg) from e

def scenarios_run_file(
self, scenario_base64: str, additional_args: list[str], network: str = "warnet"
def _start_scenario(
self,
scenario_path: str,
scenario_name: str,
additional_args: list[str],
network: str,
) -> str:
scenario_path = None
with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_file:
scenario_path = temp_file.name

# decode base64 string to binary
scenario_bytes = base64.b64decode(scenario_base64)
# write binary to file
temp_file.write(scenario_bytes)

if not os.path.exists(scenario_path):
raise ServerError(f"Scenario not found at {scenario_path}.")

try:
run_cmd = [sys.executable, scenario_path] + additional_args + [f"--network={network}"]
self.logger.debug(f"Running {run_cmd}")

proc = subprocess.Popen(
run_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

def proc_logger():
while not proc.stdout:
time.sleep(0.1)
for line in proc.stdout:
self.logger.info(line.decode().rstrip())

t = threading.Thread(target=lambda: proc_logger())
t = threading.Thread(target=lambda: self.proc_logger(proc))
t.daemon = True
t.start()

self.running_scenarios.append(
{
"pid": proc.pid,
"cmd": f"{scenario_path} {' '.join(additional_args)}",
"cmd": f"{scenario_name} {' '.join(additional_args)}",
"proc": proc,
"network": network,
}
)

return f"Running scenario with PID {proc.pid} in the background..."

return f"Running scenario {scenario_name} with PID {proc.pid} in the background..."
except Exception as e:
msg = f"Error running scenario: {e}"
self.logger.error(msg)
raise ServerError(message=msg) from e

def scenarios_run_file(
self,
scenario_base64: str,
scenario_name: str,
additional_args: list[str],
network: str = "warnet",
) -> str:
# Extract just the filename without path and extension
with tempfile.NamedTemporaryFile(
prefix=scenario_name,
suffix=".py",
delete=False,
) as temp_file:
scenario_path = temp_file.name
temp_file.write(base64.b64decode(scenario_base64))

if not os.path.exists(scenario_path):
raise ServerError(f"Scenario not found at {scenario_path}.")

return self._start_scenario(scenario_path, scenario_name, additional_args, network)

def scenarios_run(
self, scenario: str, additional_args: list[str], network: str = "warnet"
) -> str:
Expand All @@ -370,41 +378,7 @@ def scenarios_run(
if not os.path.exists(scenario_path):
raise ServerError(f"Scenario {scenario} not found at {scenario_path}.")

try:
run_cmd = [sys.executable, scenario_path] + additional_args + [f"--network={network}"]
self.logger.debug(f"Running {run_cmd}")

proc = subprocess.Popen(
run_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

def proc_logger():
while not proc.stdout:
time.sleep(0.1)
for line in proc.stdout:
self.logger.info(line.decode().rstrip())

t = threading.Thread(target=lambda: proc_logger())
t.daemon = True
t.start()

self.running_scenarios.append(
{
"pid": proc.pid,
"cmd": f"{scenario} {' '.join(additional_args)}",
"proc": proc,
"network": network,
}
)

return f"Running scenario {scenario} with PID {proc.pid} in the background..."

except Exception as e:
msg = f"Error running scenario: {e}"
self.logger.error(msg)
raise ServerError(message=msg) from e
return self._start_scenario(scenario_path, scenario, additional_args, network)

def scenarios_stop(self, pid: int) -> str:
matching_scenarios = [sc for sc in self.running_scenarios if sc["pid"] == pid]
Expand Down
9 changes: 9 additions & 0 deletions test/scenarios_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,25 @@ def check_available_scenarios(self):
assert len(scenarios) == 4, f"Expected 4 available scenarios, got {len(scenarios)}"
self.log.info(f"Found {len(scenarios)} available scenarios")

def scenario_running(self, scenario_name: str):
"""Check that we are only running a single scenario of the correct name"""
active = self.rpc("scenarios_list_running")
running = scenario_name in active[0]["cmd"]
return running and len(active) == 1

def run_and_check_scenario(self, scenario_name):
self.log.info(f"Running scenario: {scenario_name}")
self.warcli(f"scenarios run {scenario_name} --allnodes --interval=1")
self.wait_for_predicate(lambda: self.scenario_running(scenario_name))
self.wait_for_predicate(lambda: self.check_blocks(30))
self.stop_scenario()

def run_and_check_scenario_from_file(self, scenario_file):
self.log.info(f"Running scenario from file: {scenario_file}")
self.warcli(f"scenarios run-file {scenario_file} --allnodes --interval=1")
start = int(self.warcli("rpc 0 getblockcount"))
scenario_name = os.path.splitext(os.path.basename(scenario_file))[0]
self.wait_for_predicate(lambda: self.scenario_running(scenario_name))
self.wait_for_predicate(lambda: self.check_blocks(2, start=start))
self.stop_scenario()

Expand Down