From 86a0908f84cfd490841b8a638123673ecdfc2129 Mon Sep 17 00:00:00 2001 From: alexstroke Date: Tue, 12 Mar 2024 14:17:48 +0100 Subject: [PATCH] [feature] Add event listening functions Signed-off-by: alexstroke <111361420+astrokov7@users.noreply.github.com> --- .github/workflows/iroha2-dev-pr.yml | 1 + client_cli/pytests/common/helpers.py | 14 +-- client_cli/pytests/common/settings.py | 3 - client_cli/pytests/src/client_cli/__init__.py | 6 +- .../pytests/src/client_cli/client_cli.py | 95 ++++++++++++++++++- .../pytests/src/client_cli/configuration.py | 71 +++++++------- client_cli/pytests/test/conftest.py | 5 +- 7 files changed, 139 insertions(+), 56 deletions(-) diff --git a/.github/workflows/iroha2-dev-pr.yml b/.github/workflows/iroha2-dev-pr.yml index 0b6d47e7428..18ffd766f5f 100644 --- a/.github/workflows/iroha2-dev-pr.yml +++ b/.github/workflows/iroha2-dev-pr.yml @@ -7,6 +7,7 @@ on: - '**.rs' - '**.json' - '**.toml' + - '**.py' - '.github/workflows/iroha2-dev-pr.yml' concurrency: diff --git a/client_cli/pytests/common/helpers.py b/client_cli/pytests/common/helpers.py index 6145ea35557..7133f013d37 100644 --- a/client_cli/pytests/common/helpers.py +++ b/client_cli/pytests/common/helpers.py @@ -29,15 +29,15 @@ def extract_hash(stdout): return match.group(1) if match else None -def get_peers_config_files(path_to_configs): +def get_peers_ports_list(port_min=8080, port_max=8083): """ - Returns a list of config file paths from the given directory. + Returns a list of peer ports within the specified range. + + port_min (int): The minimum port number in the range. Default is 8080. + port_max (int): The maximum port number in the range. Default is 8083. """ - config_files = [] - for entry in os.listdir(path_to_configs): - if entry.endswith(".json") and "config_to_peer" in entry: - config_files.append(os.path.join(path_to_configs, entry)) - return config_files + + return [port for port in range(port_min, port_max + 1)] def read_isi_from_json(file_path): diff --git a/client_cli/pytests/common/settings.py b/client_cli/pytests/common/settings.py index 3593f91d0b7..1ba108cd830 100644 --- a/client_cli/pytests/common/settings.py +++ b/client_cli/pytests/common/settings.py @@ -11,11 +11,8 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -ROOT_DIR = os.environ.get("CLIENT_CLI_DIR", BASE_DIR) - PATH_CONFIG_CLIENT_CLI = os.environ["CLIENT_CLI_CONFIG"] CLIENT_CLI_PATH = os.environ["CLIENT_CLI_BINARY"] -PEERS_CONFIGS_PATH = os.path.join(ROOT_DIR, "peers_configs") PORT_MIN = int(os.getenv("TORII_API_PORT_MIN", "8080")) PORT_MAX = int(os.getenv("TORII_API_PORT_MAX", "8083")) diff --git a/client_cli/pytests/src/client_cli/__init__.py b/client_cli/pytests/src/client_cli/__init__.py index dd9bea09b2a..217415e8bb7 100644 --- a/client_cli/pytests/src/client_cli/__init__.py +++ b/client_cli/pytests/src/client_cli/__init__.py @@ -3,9 +3,9 @@ """ from common.settings import PATH_CONFIG_CLIENT_CLI, PORT_MAX, PORT_MIN -from src.client_cli.client_cli import ClientCli -from src.client_cli.configuration import Config -from src.client_cli.iroha import Iroha +from .client_cli import ClientCli +from .configuration import Config +from .iroha import Iroha config = Config(PORT_MIN, PORT_MAX) config.load(PATH_CONFIG_CLIENT_CLI) diff --git a/client_cli/pytests/src/client_cli/client_cli.py b/client_cli/pytests/src/client_cli/client_cli.py index fd751c77f65..dec7b628c3d 100644 --- a/client_cli/pytests/src/client_cli/client_cli.py +++ b/client_cli/pytests/src/client_cli/client_cli.py @@ -3,16 +3,19 @@ commands for interacting with Iroha blockchain using the Iroha command-line client. """ +import json import shlex import subprocess +import threading +from json import JSONDecoder from pathlib import Path from time import monotonic, sleep -from typing import Callable +from typing import Callable, Dict, Any import allure # type: ignore from common.helpers import extract_hash, read_isi_from_json, write_isi_to_json -from common.settings import BASE_DIR, CLIENT_CLI_PATH, PATH_CONFIG_CLIENT_CLI, ROOT_DIR +from common.settings import BASE_DIR, CLIENT_CLI_PATH, PATH_CONFIG_CLIENT_CLI from src.client_cli.configuration import Config @@ -35,6 +38,9 @@ def __init__(self, config: Config): self.stderr = None self.transaction_hash = None self._timeout = 20 + self.event_data: Dict[str, Any] = {} + self.event_data_lock = threading.Lock() + self.should_continue_listening = True def __enter__(self): """ @@ -53,6 +59,37 @@ def __exit__(self, exc_type, exc_val, exc_tb): """ self.reset() + def start_listening_to_events(self, peers_ports): + """ + Initializes listening to events on all peers. + """ + self.transaction_status = {} + self.threads = [] + for port in peers_ports: + self.config.update_torii_url(port) + thread = threading.Thread(target=self.listen_to_events, args=(port,)) + self.threads.append(thread) + thread.start() + + def listen_to_events(self, config_path): + """ + Listens to the events using the specified configuration file and stores them. + """ + command = [self.BASE_PATH] + ["--config=" + config_path, "events", "pipeline"] + with subprocess.Popen(command, stdout=subprocess.PIPE, text=True) as process: + while self.should_continue_listening: + output = process.stdout.readline() + if not output: + break + with self.event_data_lock: + if config_path in self.event_data: + self.event_data[config_path] += output + else: + self.event_data[config_path] = output + + def stop_listening(self): + self.should_continue_listening = False + def wait_for(self, condition: Callable[[], bool], timeout=None): """ Wait for a certain condition to be met, specified by the expected and actual values. @@ -278,7 +315,7 @@ def register_trigger(self, account): trigger_data = read_isi_from_json(str(json_template_path)) trigger_data[0]["Register"]["Trigger"]["action"]["authority"] = str(account) - json_temp_file_path = Path(ROOT_DIR) / "isi_register_trigger.json" + json_temp_file_path = Path(CLIENT_CLI_PATH) / "isi_register_trigger.json" write_isi_to_json(trigger_data, str(json_temp_file_path)) self._execute_pipe( @@ -306,7 +343,7 @@ def unregister_asset(self, asset_id): asset_data = read_isi_from_json(str(json_template_path)) asset_data[0]["Unregister"]["Asset"]["object_id"] = str(asset_id) - json_temp_file_path = Path(ROOT_DIR) / "isi_unregister_asset.json" + json_temp_file_path = Path(CLIENT_CLI_PATH) / "isi_unregister_asset.json" write_isi_to_json(asset_data, str(json_temp_file_path)) self._execute_pipe( @@ -338,7 +375,10 @@ def execute(self, command=None): if command is None: command = self.command else: - command = [self.BASE_PATH] + self.BASE_FLAGS + shlex.split(command) + if isinstance(command, str): + command = [self.BASE_PATH] + self.BASE_FLAGS + shlex.split(command) + elif isinstance(command, list): + command = [self.BASE_PATH] + self.BASE_FLAGS + command if "|" in command: pipe_index = command.index("|") @@ -373,6 +413,7 @@ def _execute_single(self, command): """ Executes a single command. """ + print(" ".join(command) + "\n") with subprocess.Popen( command, stdout=subprocess.PIPE, @@ -384,6 +425,50 @@ def _execute_single(self, command): self.transaction_hash = extract_hash(self.stdout) self._attach_allure_reports() + def wait_for_transaction_commit(self, transaction_hash, timeout=1): + """ + Waits for the transaction with the given hash to be committed in all configs. + """ + + def is_transaction_committed(): + return self.is_transaction_committed(transaction_hash) + + try: + self.wait_for(is_transaction_committed, timeout) + return True + except TimeoutError: + return False + + def is_transaction_committed(self, transaction_hash): + """ + Checks if the transaction with the given hash is committed in all configs. + """ + with self.event_data_lock: + for config_path, data in self.event_data.items(): + if not self._check_commit_in_output(transaction_hash, data): + return False + return True + + def _check_commit_in_output(self, transaction_hash, output): + """ + Parses the output to check if the transaction with the given hash is committed. + """ + decoder = JSONDecoder() + idx = 0 + try: + while idx < len(output): + obj, idx_next = decoder.raw_decode(output[idx:]) + if ( + obj.get("Pipeline", {}).get("entity_kind") == "Transaction" + and obj.get("Pipeline", {}).get("status") == "Committed" + and obj.get("Pipeline", {}).get("hash") == transaction_hash + ): + return True + idx += idx_next + except json.JSONDecodeError: + return False + return False + def _attach_allure_reports(self): """ Attaches stdout and stderr to Allure reports. diff --git a/client_cli/pytests/src/client_cli/configuration.py b/client_cli/pytests/src/client_cli/configuration.py index 5e04cdfbecd..b89ba26d574 100644 --- a/client_cli/pytests/src/client_cli/configuration.py +++ b/client_cli/pytests/src/client_cli/configuration.py @@ -4,7 +4,6 @@ import tomlkit import glob -import json import os import random from urllib.parse import urlparse @@ -12,9 +11,10 @@ class Config: """ - Configuration class to handle Iroha network configuration. The class provides methods for loading - the configuration from a file, accessing the configuration values, and randomising Torii URL - to access different peers. + Configuration class to handle Iroha network configuration. + The class provides methods for loading the configuration from a file, + accessing the configuration values, + and randomising Torii URL to access different peers. :param port_min: The minimum port number for the TORII_API_URL. :type port_min: int @@ -35,48 +35,30 @@ def load(self, path_config_client_cli): :param path_config_client_cli: The path to the configuration file. :type path_config_client_cli: str - :raises IOError: If the file does not exist. + :raises IOError: If the file does not exist or is not a file. + :raises ValueError: If the configuration file is invalid. """ if not os.path.exists(path_config_client_cli): raise IOError(f"No config file found at {path_config_client_cli}") - if not os.path.isfile(path_config_client_cli): raise IOError(f"The path is not a file: {path_config_client_cli}") - with open(path_config_client_cli, "r", encoding="utf-8") as config_file: - self._config = tomlkit.load(config_file) - self.file = path_config_client_cli - - def generate_by_peers(self, peers_configs_dir): - """ - Generate configuration files for each port in the range from port_min to port_max. - """ - if self._config is None: - raise ValueError( - "No configuration loaded. Use load() method to load the configuration." - ) - - if self.port_min >= self.port_max: - raise ValueError("port_min must be less than port_max.") - - os.makedirs(peers_configs_dir, exist_ok=True) + try: + with open(path_config_client_cli, "r", encoding="utf-8") as config_file: + self._config = tomlkit.load(config_file) + except Exception as e: + raise ValueError(f"Error reading configuration file: {e}") - for port in range(self.port_min, self.port_max + 1): - config_copy = self._config.copy() - config_copy["TORII_API_URL"] = f"http://localhost:{port}" - file_name = f"config_to_peer_{port}.json" - file_path = os.path.join(peers_configs_dir, file_name) - with open(file_path, "w", encoding="utf-8") as config_file: - json.dump(config_copy, config_file, indent=4) + self.file = path_config_client_cli - def select_random_peer_config(self): + def select_random_peer_config(self, peers_configs_dir): """ Select and load a random configuration file generated by the generate_by_peers method. This updates the current configuration to the one chosen. :return: None """ - peers_configs = glob.glob("path/to/peers/configs/*.json") + peers_configs = glob.glob(peers_configs_dir + "/*.toml") if not peers_configs: raise ValueError( "Peer configuration files not found. First generate them using generate_by_peers." @@ -88,17 +70,34 @@ def select_random_peer_config(self): def randomise_torii_url(self): """ - Update Torii URL. + Randomise the Torii URL port. Note that in order for update to take effect, `self.env` should be used when executing the client cli. :return: None """ - parsed_url = urlparse(self._config["torii_url"]) random_port = random.randint(self.port_min, self.port_max) - self._envs["TORII_URL"] = parsed_url._replace( - netloc=f"{parsed_url.hostname}:{random_port}" + self.update_torii_url(random_port) + + def update_torii_url(self, port): + """ + Update the Torii URL in the current configuration. + + :param port: Port to use in the Torii URL. + :type port: int + :raises ValueError: If the port is outside the allowed range. + """ + if port < self.port_min or port > self.port_max: + raise ValueError("Port is out of allowed range.") + + if self._config is None: + raise ValueError("No configuration loaded. Use load() method first.") + + parsed_url = urlparse(self._config["torii_url"]) + updated_url = parsed_url._replace( + netloc=f"{parsed_url.hostname}:{port}" ).geturl() + self._envs["TORII_URL"] = updated_url @property def torii_url(self): diff --git a/client_cli/pytests/test/conftest.py b/client_cli/pytests/test/conftest.py index fbfb96ee4b8..46a62c74df6 100644 --- a/client_cli/pytests/test/conftest.py +++ b/client_cli/pytests/test/conftest.py @@ -14,13 +14,14 @@ generate_random_string_with_reserved_char, generate_random_string_with_whitespace, generate_random_string_without_reserved_chars, + get_peers_ports_list, key_with_invalid_character_in_key, name_with_uppercase_letter, not_existing_name, random, string, ) -from common.settings import PEERS_CONFIGS_PATH +from common.settings import PORT_MIN, PORT_MAX from models import Account, Asset, AssetDefinition, Domain from src.client_cli import client_cli, config @@ -31,7 +32,7 @@ def before_all(): """Initial setup for all test sessions. This fixture generates configurations based on peers and is automatically used for every test session.""" - config.generate_by_peers(PEERS_CONFIGS_PATH) + client_cli.start_listening_to_events(get_peers_ports_list(PORT_MIN, PORT_MAX)) yield