From fa859bf0e6c74c2d6f203104b84089f69a2bdcba Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 24 Apr 2025 11:59:09 +0100 Subject: [PATCH 1/9] Refactoring workloads: new classes Signed-off-by: Chris Harris(harriscr@uk.ibm.com) --- cli_options.py | 53 ++++++++++++ command/__init__.py | 0 command/command.py | 99 ++++++++++++++++++++++ command/fio_command.py | 178 +++++++++++++++++++++++++++++++++++++++ workloads/__init__.py | 0 workloads/workload.py | 183 +++++++++++++++++++++++++++++++++++++++++ workloads/workloads.py | 114 +++++++++++++++++++++++++ 7 files changed, 627 insertions(+) create mode 100644 cli_options.py create mode 100644 command/__init__.py create mode 100644 command/command.py create mode 100644 command/fio_command.py create mode 100644 workloads/__init__.py create mode 100644 workloads/workload.py create mode 100644 workloads/workloads.py diff --git a/cli_options.py b/cli_options.py new file mode 100644 index 00000000..e1bdbb4e --- /dev/null +++ b/cli_options.py @@ -0,0 +1,53 @@ +""" +A class to encapsulate a set of configuration options that can be used to +construct the CLI to use to run a benchmark +""" + +from collections import UserDict +from logging import Logger, getLogger +from typing import Optional + +log: Logger = getLogger("cbt") + + +class CliOptions(UserDict[str, Optional[str]]): + def __setitem__(self, key: str, value: Optional[str]) -> None: + """ + Add an entry to the configuration. + Will report an error if key already exists + """ + if key not in self.data.keys(): + self.data[key] = value + else: + log.warning("Not adding %s:%s to configuration. A value is already set", key, value) + + def __update__(self, key_value_pair: tuple[str, str]) -> None: + """ + Update an existing entry in the configuration. + If the entry exists then don't update it + """ + key, value = key_value_pair + if key not in self.data.keys(): + self.data[key] = value + else: + log.warning("Not Updating %s:%s in configuration. Value already exists", key, value) + + def __getitem__(self, key: str) -> Optional[str]: + """ + Get the value for key in the configuration. + Return None and log a warning if the key does not exist + """ + if key in self.data.keys(): + return self.data[key] + else: + log.warning("Key %s does not exist in configuration", key) + return None + + def clear(self) -> None: + """ + Clear the configuration + """ + self.data = {} + + def __str__(self) -> str: + return super().__str__() diff --git a/command/__init__.py b/command/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/command/command.py b/command/command.py new file mode 100644 index 00000000..a5435c8c --- /dev/null +++ b/command/command.py @@ -0,0 +1,99 @@ +""" +A class to deal with a command that will run a single instance of +a benchmark executable + +It will return the full executable string that can be used to run a +cli command using whatever method the Benchmark chooses +""" + +from abc import ABCMeta, abstractmethod +from logging import Logger, getLogger +from typing import Optional + +from cli_options import CliOptions + +log: Logger = getLogger("cbt") + + +class Command(metaclass=ABCMeta): + """ + A class that encapsulates a single CLI command that can be run on a + system + """ + + def __init__(self, options: dict[str, str]) -> None: + self._executable: Optional[str] = None + self._output_directory: str = "" + self._options: CliOptions = self._parse_options(options) + + @abstractmethod + def _parse_options(self, options: dict[str, str]) -> CliOptions: + """ + Take the options passed in from the configuration yaml file and + convert them to a list of key/value pairs that match the parameters + to pass to the benchmark executable + """ + + @abstractmethod + def _generate_full_command(self) -> str: + """ + generate the full cli command that will be sent to the client + to run the benchmark + """ + + @abstractmethod + def _parse_global_options(self, options: dict[str, str]) -> CliOptions: + """ + Parse the set of global options into the correct format for the command type + """ + + @abstractmethod + def _generate_output_directory_path(self) -> str: + """ + Generate the part of the output directory that is relevant to this + specific command. + + The format is dependent on the specific Command implementation + """ + + def get(self) -> str: + """ + get the full cli string that can be sent to a system. + + This string contains all the options for a single run of the + benchmark executable + """ + if self._executable is None: + log.error("Executable has not yet been set for this command.") + return "" + + return self._generate_full_command() + + def get_output_directory(self) -> str: + """ + Return the output directory that will be used for this command + """ + return self._output_directory + + def set_executable(self, executable_path: str) -> None: + """ + set the executable to be used for this command + """ + self._executable = executable_path + + def set_global_options(self, global_options: dict[str, str]) -> None: + """ + Update the global options + """ + self._options.update(self._parse_global_options(global_options)) + + def update_options(self, new_options: dict[str, str]) -> None: + """ + Update the command with the new_options dictionary + """ + self._options.update(new_options) + for key, value in new_options.items(): + if key not in self._options.keys(): + self._options[key] = value + else: + log.debug("key %s already exists. Not overwriting", key) diff --git a/command/fio_command.py b/command/fio_command.py new file mode 100644 index 00000000..e880d9d6 --- /dev/null +++ b/command/fio_command.py @@ -0,0 +1,178 @@ +""" +A class to deal with a command that will run a single instance of the +fio I/O exerciser + +It will return the full executable string that can be used to run a +cli command using whatever method the Benchmark chooses +""" + +from logging import Logger, getLogger +from typing import Optional + +from cli_options import CliOptions +from command.command import Command +from common import get_fqdn_cmd + +log: Logger = getLogger("cbt") + + +class FioCommand(Command): + """ + The fio command class. This class represents a single fio command + line that can be run on a local or remote client system. + """ + + REQUIRED_OPTIONS = {"ioengine": "rbd", "clientname": "admin", "invalidate": "0", "direct": "1"} + DIRECT_TRANSLATIONS: list[str] = ["numjobs", "iodepth"] + + def __init__(self, options: dict[str, str], workload_output_directory: str) -> None: + self._volume_number: int = int(options["volume_number"]) + self._total_iodepth: Optional[str] = options.get("total_iodepth", None) + self._workload_output_directory: str = workload_output_directory + super().__init__(options) + + def _parse_global_options(self, options: dict[str, str]) -> CliOptions: + global_options: CliOptions = CliOptions(options) + + return global_options + + def _parse_options(self, options: dict[str, str]) -> CliOptions: + fio_cli_options: CliOptions = CliOptions() + + fio_cli_options.update(self.REQUIRED_OPTIONS) + for option in self.DIRECT_TRANSLATIONS: + # Below only needed if testing? + fio_cli_options[option] = options[option] if option in options.keys() else "" + + fio_cli_options["rw"] = options.get("mode", "write") + fio_cli_options["output-format"] = options.get("fio_out_format", "json,normal") + fio_cli_options["pool"] = options.get("poolname", "cbt-librbdfio") + fio_cli_options["numjobs"] = options.get("numjobs", "1") + fio_cli_options["bs"] = options.get("op_size", "4194304") + fio_cli_options["end_fsync"] = f"{options.get('end_fsync', 0)}" + + if options.get("random_distribution", None) is not None: + fio_cli_options["random_distribution"] = options.get("random_distribution", None) + + if options.get("log_avg_msec", None) is not None: + fio_cli_options["log_avg_msec"] = options.get("log_avg_msec", None) + + if options.get("time", None) is not None: + fio_cli_options["runtime"] = options.get("time", None) + + if options.get("ramp", None) is not None: + fio_cli_options["ramp_time"] = options.get("ramp", None) + + if options.get("rate_iops", None) is not None: + fio_cli_options["rate_iops"] = options.get("rate_iops", None) + + if bool(options.get("time_based", False)) is True: + fio_cli_options["time_based"] = "" + + if bool(options.get("no_sudo", False)) is False: + fio_cli_options["sudo"] = "" + + if options.get("norandommap", None) is not None: + fio_cli_options["norandommap"] = "" + + if "recovery_test" in options.keys(): + fio_cli_options["time_based"] = "" + + # Secondary options + if fio_cli_options["rw"] == "readwrite" or fio_cli_options["rw"] == "randrw": + read_percent: str = options.get("rwmixread", "50") + write_percent: str = f"{100 - int(read_percent)}" + fio_cli_options["rwmixread"] = read_percent + fio_cli_options["rwmixwrite"] = write_percent + + rbd_name: str = options.get("rbdname", "") + if rbd_name == "": + rbd_name = f"cbt-fio-`{get_fqdn_cmd()}`-{self._volume_number:d}" # type: ignore[no-untyped-call] + fio_cli_options["rbdname"] = rbd_name + + if bool(options.get("log_iops", True)): + fio_cli_options["log_iops"] = "" + + if bool(options.get("log_bw", True)): + fio_cli_options["log_bw"] = "" + + if bool(options.get("log_lat", True)): + fio_cli_options["log_lat"] = "" + + processes_per_volume: int = int(options.get("procs_per_volume", 1)) + + fio_cli_options["name"] = self._get_job_name(options["name"], processes_per_volume) + + return fio_cli_options + + def _generate_full_command(self) -> str: + command: str = "" + + output_file: str = f"{self._generate_output_directory_path()}/output.{self._volume_number:d}" + self._setup_logging(output_file) + + if "sudo" in self._options.keys(): + command += "sudo " + del self._options["sudo"] + + command += f"{self._executable} " + + for name, value in self._options.items(): + if name == "name" and value is not None: + for jobname in value.strip().split(" "): + command += f"--{name}={jobname} " + elif value != "": + command += f"--{name}={value} " + else: + command += f"--{name} " + + command += f"> {output_file}" + + return command + + def _generate_output_directory_path(self) -> str: + """ + For an fio command the output format is: + numjobs-/total_iodepth-/iodepth- + if total_iodepth was used in the options, otherwise: + numjobs-/iodepth- + """ + output_path: str = f"{self._workload_output_directory}/numjobs-{int(str(self._options['numjobs'])):03d}/" + + if self._total_iodepth is not None: + output_path += f"total_iodepth-{self._total_iodepth}/" + + output_path += f"iodepth-{int(str(self._options['iodepth'])):03d}" + + return output_path + + def _get_job_name(self, parent_workload_name: str, processes_per_volume: int) -> str: + """ + Get the name for this job to give to fio + This is of the format: + + cbt--- + """ + + job_name: str = "" + + for process_number in range(processes_per_volume): + job_name += f"cbt-fio-{parent_workload_name}-`hostname`-file-{process_number} " + + return job_name + + def _setup_logging(self, output_file_name: str) -> None: + """ + Set up the log paths if required + """ + if "log_iops" in self._options.keys(): + self._options.pop("log_iops") + self._options["write_iops_log"] = output_file_name + + if "log_bw" in self._options.keys(): + self._options.pop("log_bw") + self._options["write_bw_log"] = output_file_name + + if "log_lat" in self._options.keys(): + self._options.pop("log_lat") + self._options["write_lat_log"] = output_file_name diff --git a/workloads/__init__.py b/workloads/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workloads/workload.py b/workloads/workload.py new file mode 100644 index 00000000..ea3ed3d2 --- /dev/null +++ b/workloads/workload.py @@ -0,0 +1,183 @@ +""" +The workload class that encapsulates a single workload that can be +run by any benchmark +""" + +from logging import Logger, getLogger +from typing import Generator, Optional, Union + +from benchmarkfactory import all_configs # pyright: ignore [reportUnknownVariableType] +from command.command import Command +from command.fio_command import FioCommand + +WORKLOAD_TYPE = dict[str, Union[str, list[str]]] +WORKLOAD_YAML_TYPE = dict[str, WORKLOAD_TYPE] + +log: Logger = getLogger("cbt") + + +class Workload: + """ + A single workload that is expected to be run against a client system. + This workload can contain one or more Command objects, representing a + single invocation of an I/O exerciser. Typically there will be a Command + object for each volume_per_client, as specified in the configuration + yaml + """ + + def __init__(self, name: str, options: WORKLOAD_TYPE, base_run_directory: str) -> None: + self._name: str = name + self._base_run_directory: str = base_run_directory + self._commands: list[Command] = [] + self._parent_benchmark_type: Optional[str] = None + self._all_options: WORKLOAD_TYPE = options.copy() + self._executable_path: str + + def get_commands(self) -> Generator[str, None, None]: + self._create_commands_from_options() + + if self._commands == []: + log.warning("There are no commands for workload %s", self._name) + return + + for command in self._commands: + command.set_executable(self._executable_path) + yield command.get() + return + + def set_executable(self, executable_path: str) -> None: + """ + Set the executable to be used for the workload + """ + self._executable_path = executable_path + + def set_benchmark_type(self, parent_benchmark_type: str) -> None: + """ + Set the type of the parent benchmark for this workload. This + determines which Command sub class we need to instantiate for this + particular workload + """ + self._parent_benchmark_type = parent_benchmark_type + + def add_global_options(self, global_options: WORKLOAD_TYPE) -> None: + """ + Set any options for the workload that are not included in the + 'workloads' section of the configuration yaml + + if a value exists already in the configuration then ignore it + """ + for key, value in global_options.items(): + if key not in self._all_options.keys(): + self._all_options[key] = value + + def create_output_directory(self) -> None: + """ + create the results directory for the test run + """ + pass + + def _create_command_class(self, options: dict[str, str]) -> Command: + """ + Create the concrete command classes for each command for this workload + """ + if self._parent_benchmark_type == "fio": + return FioCommand(options, f"{self._base_run_directory}/{self._name}") + + log.error("Benchmark Class %s is not supported by workloads yet", self._parent_benchmark_type) + raise NotImplementedError + + def _create_commands_from_options(self) -> None: + unique_options: dict[str, str] + for unique_options in all_configs(self._all_options): # type: ignore[no-untyped-call] + iodepth_key: str = self._get_iodepth_key(list(unique_options.keys())) + unique_options["iodepth_key"] = iodepth_key + iodepth: int = int(unique_options.get(iodepth_key, 16)) + number_of_volumes: int = int(unique_options.get("volumes_per_client", 1)) + iodepth_per_volume: dict[int, int] = self._calculate_iodepth_per_volume( + number_of_volumes, iodepth, iodepth_key + ) + unique_options["name"] = self._name + + for volume_number in iodepth_per_volume.keys(): + unique_options["iodepth"] = f"{iodepth_per_volume[volume_number]}" + unique_options["volume_number"] = f"{volume_number}" + self._commands.append(self._create_command_class(unique_options)) + + # I htink the above will overwrite the iodepth to be used for the command, + # while still retaining a total_iodepth value if one is passed. We can then + # use the total_iodepth value to add into the output_dir so we can read it + + def _get_iodepth_key(self, configuration_keys: list[str]) -> str: + """ + Get the range of iodepth values to use for this test. This will either + be the list of total_iodepth values if the total_iodepth key exists, + or the iodepth value if it does not + """ + iodepth_key: str = "iodepth" + if "total_iodepth" in configuration_keys: + iodepth_key = "total_iodepth" + + return iodepth_key + + def _calculate_iodepth_per_volume(self, number_of_volumes: int, iodepth: int, iodepth_key: str) -> dict[int, int]: + """ + Calculate the desired iodepth per volume for a single benchmark run. + If total_iodepth is to be used calculate what the iodepth per volume + should be and return that, otherwise return the iodepth value for each + volume + """ + if iodepth_key == "total_iodepth": + return self._calculate_iodepth_per_volume_from_total_iodepth(number_of_volumes, iodepth) + else: + return self._set_iodepth_for_every_volume(number_of_volumes, iodepth) + + def _calculate_iodepth_per_volume_from_total_iodepth( + self, number_of_volumes: int, total_desired_iodepth: int + ) -> dict[int, int]: + """ + Given the total desired iodepth and the number of volumes from the + configuration yaml file, calculate the iodepth for each volume + + If the iodepth specified in total_iodepth is too small to allow + an iodepth of 1 per volume, then reduce the number of volumes + used to allow an iodepth of 1 per volume. + """ + queue_depths: dict[int, int] = {} + + if number_of_volumes > total_desired_iodepth: + log.warning( + "The total iodepth requested: %s is less than 1 per volume (%s)", + total_desired_iodepth, + number_of_volumes, + ) + log.warning( + "Number of volumes per client will be reduced from %s to %s", number_of_volumes, total_desired_iodepth + ) + number_of_volumes = total_desired_iodepth + + iodepth_per_volume: int = total_desired_iodepth // number_of_volumes + remainder: int = total_desired_iodepth % number_of_volumes + + for volume_id in range(number_of_volumes): + iodepth: int = iodepth_per_volume + + if remainder > 0: + iodepth += 1 + remainder -= 1 + queue_depths[volume_id] = iodepth + + return queue_depths + + def _set_iodepth_for_every_volume(self, number_of_volumes: int, iodepth: int) -> dict[int, int]: + """ + Given an iodepth value and the number of volumes return a dictionary + that contains the desired iodepth value for each volume + """ + queue_depths: dict[int, int] = {} + for volume_id in range(number_of_volumes): + queue_depths[volume_id] = iodepth + + return queue_depths + + def __str__(self) -> str: + return f"Name: {self._name}." diff --git a/workloads/workloads.py b/workloads/workloads.py new file mode 100644 index 00000000..61bdbf96 --- /dev/null +++ b/workloads/workloads.py @@ -0,0 +1,114 @@ +""" +The workloads class that contains all the Workloads for a given Benchmark run +""" + +from logging import Logger, getLogger +from typing import Generator, Union + +from workloads.workload import WORKLOAD_TYPE, WORKLOAD_YAML_TYPE, Workload + +BENCHMARK_CONFIGURATION_TYPE = dict[ + str, dict[str, Union[str, list[str], dict[str, dict[str, Union[str, list[int]]]], dict[str, str]]] +] + +log: Logger = getLogger("cbt") + + +class Workloads: + def __init__(self, benchmark_configuration: BENCHMARK_CONFIGURATION_TYPE, base_run_directory: str) -> None: + self._benchmark_configuration: BENCHMARK_CONFIGURATION_TYPE = benchmark_configuration + self._base_run_directory: str = base_run_directory + + self._global_options: WORKLOAD_TYPE = self._get_global_options_from_configuration(benchmark_configuration) + + self._benchmark_type: str = "" + self._executable: str = "" + self._workloads: list[Workload] = [] + + workloads_configuration: WORKLOAD_YAML_TYPE = benchmark_configuration.get("workloads", {}) # type: ignore[assignment] + self._create_configurations(workloads_configuration) + + def exist(self) -> bool: + """ + Return True if there is a workload configuration, otherwise False + + Can be used to check if we want to run a workload-style test run + or a normal style test run + """ + return self._workloads != [] + + def get(self) -> Generator[Workload, None, None]: + """ + Return all the workloads, one at a time + """ + if self._workloads == []: + return + + if not self._benchmark_type: + log.error("Benchmark type has not been set. Run set_benchmark_type() to set it") + + if not self._executable: + log.error("Executable path has not been set Run set_executable() to set it") + + for workload in self._workloads: + workload.set_benchmark_type(self._benchmark_type) + workload.set_executable(self._executable) + # workload.create_output_directory() + yield workload + return + + def get_all_commands(self) -> Generator[str, None, None]: + """ + Yield the string for each of the commands required to run all + the workloads we know about + TODO: Do we want this????? + """ + for workload in self._workloads: + for command in workload.get_commands(): + yield command + + def set_benchmark_type(self, benchmark_type: str) -> None: + """ + set the benchmark type that will be used to run the workload + + This must be done by the benchmark before it attempts to run + any commands + """ + self._benchmark_type = benchmark_type + + def set_executable(self, executable_path: str) -> None: + """ + Set the executable to be used for the + """ + self._executable = executable_path + + def _create_configurations(self, workload_json: WORKLOAD_YAML_TYPE) -> None: + """ + Get the options needed to construct the benchmark command to run the test + """ + for workload_name, workload_options in workload_json.items(): + workload = Workload(workload_name, workload_options, self._base_run_directory) + workload.add_global_options(self._global_options) + # workload.set_benchmark_type(self._benchmark_type) + + self._workloads.append(workload) + + def _get_global_options_from_configuration(self, configuration: BENCHMARK_CONFIGURATION_TYPE) -> WORKLOAD_TYPE: + """ + Get any configuration options from the yaml that are not workload + specific + """ + global_options: WORKLOAD_TYPE = {} + + for option_name, value in configuration.items(): + if option_name == "workloads" or option_name == "prefill": + # prefill is not an option for workloads as it is used in the Benchmark prefill() + # method. + # Workloads we also want to ignore + pass + elif isinstance(value, list): + global_options[option_name] = value + else: + global_options[option_name] = f"{value}" + + return global_options From a9101be138af88fb8c7e21b8f482453169a40f24 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 24 Apr 2025 11:59:09 +0100 Subject: [PATCH 2/9] Refactoring workloads: new classes Signed-off-by: Chris Harris(harriscr@uk.ibm.com) --- command/fio_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/fio_command.py b/command/fio_command.py index e880d9d6..a96d8985 100644 --- a/command/fio_command.py +++ b/command/fio_command.py @@ -157,7 +157,7 @@ def _get_job_name(self, parent_workload_name: str, processes_per_volume: int) -> job_name: str = "" for process_number in range(processes_per_volume): - job_name += f"cbt-fio-{parent_workload_name}-`hostname`-file-{process_number} " + job_name += f"cbt-fio-{parent_workload_name}-`{get_fqdn_cmd()}`-file-{process_number} " # type: ignore[no-untyped-call] return job_name From 8345b8ded7127ff7bef7012e4e5b32a53aa9ce65 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 24 Apr 2025 11:59:09 +0100 Subject: [PATCH 3/9] Refactoring workloads: new classes Signed-off-by: Chris Harris(harriscr@uk.ibm.com) --- command/fio_command.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/command/fio_command.py b/command/fio_command.py index a96d8985..ed976597 100644 --- a/command/fio_command.py +++ b/command/fio_command.py @@ -41,7 +41,6 @@ def _parse_options(self, options: dict[str, str]) -> CliOptions: fio_cli_options.update(self.REQUIRED_OPTIONS) for option in self.DIRECT_TRANSLATIONS: - # Below only needed if testing? fio_cli_options[option] = options[option] if option in options.keys() else "" fio_cli_options["rw"] = options.get("mode", "write") @@ -49,7 +48,7 @@ def _parse_options(self, options: dict[str, str]) -> CliOptions: fio_cli_options["pool"] = options.get("poolname", "cbt-librbdfio") fio_cli_options["numjobs"] = options.get("numjobs", "1") fio_cli_options["bs"] = options.get("op_size", "4194304") - fio_cli_options["end_fsync"] = f"{options.get('end_fsync', 0)}" + fio_cli_options["end_fsync"] = f"{options.get('end_fsync', '0')}" if options.get("random_distribution", None) is not None: fio_cli_options["random_distribution"] = options.get("random_distribution", None) From cf53b980a380237e13bcf9e42f198e9555c2b94c Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 24 Apr 2025 11:59:09 +0100 Subject: [PATCH 4/9] Refactoring workloads: new classes Signed-off-by: Chris Harris --- cli_options.py | 6 ++--- command/command.py | 2 +- command/fio_command.py | 2 +- workloads/workload.py | 57 +++++++++++++++++++++++++++++++++++------- workloads/workloads.py | 39 ++++++++++++++++++++--------- 5 files changed, 80 insertions(+), 26 deletions(-) diff --git a/cli_options.py b/cli_options.py index e1bdbb4e..a42a5cd6 100644 --- a/cli_options.py +++ b/cli_options.py @@ -19,7 +19,7 @@ def __setitem__(self, key: str, value: Optional[str]) -> None: if key not in self.data.keys(): self.data[key] = value else: - log.warning("Not adding %s:%s to configuration. A value is already set", key, value) + log.debug("Not adding %s:%s to configuration. A value is already set", key, value) def __update__(self, key_value_pair: tuple[str, str]) -> None: """ @@ -30,7 +30,7 @@ def __update__(self, key_value_pair: tuple[str, str]) -> None: if key not in self.data.keys(): self.data[key] = value else: - log.warning("Not Updating %s:%s in configuration. Value already exists", key, value) + log.debug("Not Updating %s:%s in configuration. Value already exists", key, value) def __getitem__(self, key: str) -> Optional[str]: """ @@ -40,7 +40,7 @@ def __getitem__(self, key: str) -> Optional[str]: if key in self.data.keys(): return self.data[key] else: - log.warning("Key %s does not exist in configuration", key) + log.debug("Key %s does not exist in configuration", key) return None def clear(self) -> None: diff --git a/command/command.py b/command/command.py index a5435c8c..9eb90184 100644 --- a/command/command.py +++ b/command/command.py @@ -73,7 +73,7 @@ def get_output_directory(self) -> str: """ Return the output directory that will be used for this command """ - return self._output_directory + return self._generate_output_directory_path() def set_executable(self, executable_path: str) -> None: """ diff --git a/command/fio_command.py b/command/fio_command.py index ed976597..e4bbb5df 100644 --- a/command/fio_command.py +++ b/command/fio_command.py @@ -156,7 +156,7 @@ def _get_job_name(self, parent_workload_name: str, processes_per_volume: int) -> job_name: str = "" for process_number in range(processes_per_volume): - job_name += f"cbt-fio-{parent_workload_name}-`{get_fqdn_cmd()}`-file-{process_number} " # type: ignore[no-untyped-call] + job_name += f"cbt-fio-{parent_workload_name}-`hostname`-file-{process_number} " return job_name diff --git a/workloads/workload.py b/workloads/workload.py index ea3ed3d2..a9b0bc2a 100644 --- a/workloads/workload.py +++ b/workloads/workload.py @@ -6,7 +6,7 @@ from logging import Logger, getLogger from typing import Generator, Optional, Union -from benchmarkfactory import all_configs # pyright: ignore [reportUnknownVariableType] +from benchmarkfactory import all_configs # pyright: ignore[reportUnknownVariableType] from command.command import Command from command.fio_command import FioCommand @@ -32,9 +32,15 @@ def __init__(self, name: str, options: WORKLOAD_TYPE, base_run_directory: str) - self._parent_benchmark_type: Optional[str] = None self._all_options: WORKLOAD_TYPE = options.copy() self._executable_path: str + self._script: str = f"{options.get('pre_workload_script', '')}" def get_commands(self) -> Generator[str, None, None]: - self._create_commands_from_options() + """ + Return all I/O exerciser commands that need to be run to fully execute + this workload + """ + if self._commands == []: + self._create_commands_from_options() if self._commands == []: log.warning("There are no commands for workload %s", self._name) @@ -45,6 +51,44 @@ def get_commands(self) -> Generator[str, None, None]: yield command.get() return + def get_output_directories(self) -> Generator[str, None, None]: + """ + For each individual run of the I/O exerciser get the output directory + for the results. + + Eventually the idea is to change this to 'create_output_directories()' + and have the workload be able to create what is needed, but that will + require more re-factoring in the CBT code that is outwith the scope of + this change + """ + if self._commands == []: + self._create_commands_from_options() + + for command in self._commands: + yield command.get_output_directory() + + def get_name(self) -> str: + """ + Return the name of this workload + """ + return self._name + + def has_script(self) -> bool: + """ + True if there is a script for this workload, otherwise false + """ + return self._script != "" + + def get_script_command(self) -> Optional[str]: + """ + If the yaml specifies a script to be run before this workload then + return the command line invocation, otherwise return None + """ + if self._script == "": + return None + + return self._script + def set_executable(self, executable_path: str) -> None: """ Set the executable to be used for the workload @@ -70,12 +114,6 @@ def add_global_options(self, global_options: WORKLOAD_TYPE) -> None: if key not in self._all_options.keys(): self._all_options[key] = value - def create_output_directory(self) -> None: - """ - create the results directory for the test run - """ - pass - def _create_command_class(self, options: dict[str, str]) -> Command: """ Create the concrete command classes for each command for this workload @@ -103,9 +141,10 @@ def _create_commands_from_options(self) -> None: unique_options["volume_number"] = f"{volume_number}" self._commands.append(self._create_command_class(unique_options)) - # I htink the above will overwrite the iodepth to be used for the command, + # The above will overwrite the iodepth to be used for the command, # while still retaining a total_iodepth value if one is passed. We can then # use the total_iodepth value to add into the output_dir so we can read it + # in post-processing. def _get_iodepth_key(self, configuration_keys: list[str]) -> str: """ diff --git a/workloads/workloads.py b/workloads/workloads.py index 61bdbf96..fbef76bf 100644 --- a/workloads/workloads.py +++ b/workloads/workloads.py @@ -3,8 +3,10 @@ """ from logging import Logger, getLogger -from typing import Generator, Union +from typing import Union +from common import CheckedPopen, CheckedPopenLocal, make_remote_dir, pdsh # pyright: ignore[reportUnknownVariableType] +from settings import getnodes # pyright: ignore[reportUnknownVariableType] from workloads.workload import WORKLOAD_TYPE, WORKLOAD_YAML_TYPE, Workload BENCHMARK_CONFIGURATION_TYPE = dict[ @@ -37,35 +39,48 @@ def exist(self) -> bool: """ return self._workloads != [] - def get(self) -> Generator[Workload, None, None]: + def run(self) -> None: """ - Return all the workloads, one at a time + Run each workload in turn """ if self._workloads == []: + log.error("No workloads to run %s", self._workloads) return if not self._benchmark_type: log.error("Benchmark type has not been set. Run set_benchmark_type() to set it") + return if not self._executable: log.error("Executable path has not been set Run set_executable() to set it") + return + processes: list[Union[CheckedPopen, CheckedPopenLocal]] = [] for workload in self._workloads: workload.set_benchmark_type(self._benchmark_type) workload.set_executable(self._executable) - # workload.create_output_directory() - yield workload - return - def get_all_commands(self) -> Generator[str, None, None]: + script_command = workload.get_script_command() + if workload.has_script() and script_command is not None: + log.debug("Scheduling script %s to run before workload %s", script_command, workload.get_name()) + pdsh(getnodes("clients"), script_command).wait() # type: ignore[no-untyped-call] + for output_directory in workload.get_output_directories(): + make_remote_dir(output_directory) # type: ignore[no-untyped-call] + for fio_command in workload.get_commands(): + processes.append(pdsh(getnodes("clients"), fio_command)) # type: ignore[no-untyped-call] + for process in processes: + process.wait() # type: ignore[no-untyped-call] + + log.info("== Workloads completed ==") + + def get_names(self) -> str: """ - Yield the string for each of the commands required to run all - the workloads we know about - TODO: Do we want this????? + Get the names for all the workloads """ + names: str = "" for workload in self._workloads: - for command in workload.get_commands(): - yield command + names += f"{workload.get_name()} " + return names def set_benchmark_type(self, benchmark_type: str) -> None: """ From 3da638d4b4b47e4f203dba5a8658d620d5154508 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 24 Apr 2025 11:59:09 +0100 Subject: [PATCH 5/9] Refactoring workloads: new classes Signed-off-by: Chris Harris --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 52e4e611..9ce46e5c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.pyc *.pyo +*.venv \ No newline at end of file From ae8d23aedf71494b24a6702052ab07294b3ec2a2 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 24 Apr 2025 11:59:09 +0100 Subject: [PATCH 6/9] Refactoring workloads: new classes Signed-off-by: Chris Harris --- cli_options.py | 9 ++++++--- command/fio_command.py | 2 +- workloads/workload.py | 14 +++++++------- workloads/workloads.py | 20 ++++++++++++++++---- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/cli_options.py b/cli_options.py index a42a5cd6..3ffbe555 100644 --- a/cli_options.py +++ b/cli_options.py @@ -11,6 +11,12 @@ class CliOptions(UserDict[str, Optional[str]]): + """ + Thic class encapsulates a set of CLI options that can be passed to a + command line invocation. It is based on a python dictionary, but with + behaviour modified so that duplicate entries do not update the original. + """ + def __setitem__(self, key: str, value: Optional[str]) -> None: """ Add an entry to the configuration. @@ -48,6 +54,3 @@ def clear(self) -> None: Clear the configuration """ self.data = {} - - def __str__(self) -> str: - return super().__str__() diff --git a/command/fio_command.py b/command/fio_command.py index e4bbb5df..a217cbf1 100644 --- a/command/fio_command.py +++ b/command/fio_command.py @@ -162,7 +162,7 @@ def _get_job_name(self, parent_workload_name: str, processes_per_volume: int) -> def _setup_logging(self, output_file_name: str) -> None: """ - Set up the log paths if required + Set up the additional fio log paths if required """ if "log_iops" in self._options.keys(): self._options.pop("log_iops") diff --git a/workloads/workload.py b/workloads/workload.py index a9b0bc2a..b152c4d5 100644 --- a/workloads/workload.py +++ b/workloads/workload.py @@ -10,8 +10,8 @@ from command.command import Command from command.fio_command import FioCommand -WORKLOAD_TYPE = dict[str, Union[str, list[str]]] -WORKLOAD_YAML_TYPE = dict[str, WORKLOAD_TYPE] +WORKLOAD_TYPE = dict[str, Union[str, list[str]]] # pylint: disable=["invalid-name"] +WORKLOAD_YAML_TYPE = dict[str, WORKLOAD_TYPE] # pylint: disable=["invalid-name"] log: Logger = getLogger("cbt") @@ -39,10 +39,10 @@ def get_commands(self) -> Generator[str, None, None]: Return all I/O exerciser commands that need to be run to fully execute this workload """ - if self._commands == []: + if not self._commands: self._create_commands_from_options() - if self._commands == []: + if not self._commands: log.warning("There are no commands for workload %s", self._name) return @@ -61,7 +61,7 @@ def get_output_directories(self) -> Generator[str, None, None]: require more re-factoring in the CBT code that is outwith the scope of this change """ - if self._commands == []: + if not self._commands: self._create_commands_from_options() for command in self._commands: @@ -136,8 +136,8 @@ def _create_commands_from_options(self) -> None: ) unique_options["name"] = self._name - for volume_number in iodepth_per_volume.keys(): - unique_options["iodepth"] = f"{iodepth_per_volume[volume_number]}" + for volume_number, iodepth in iodepth_per_volume.items(): + unique_options["iodepth"] = f"{iodepth}" unique_options["volume_number"] = f"{volume_number}" self._commands.append(self._create_command_class(unique_options)) diff --git a/workloads/workloads.py b/workloads/workloads.py index fbef76bf..cbb41a57 100644 --- a/workloads/workloads.py +++ b/workloads/workloads.py @@ -9,7 +9,7 @@ from settings import getnodes # pyright: ignore[reportUnknownVariableType] from workloads.workload import WORKLOAD_TYPE, WORKLOAD_YAML_TYPE, Workload -BENCHMARK_CONFIGURATION_TYPE = dict[ +BENCHMARK_CONFIGURATION_TYPE = dict[ # pylint: disable = ["invalid-name"] str, dict[str, Union[str, list[str], dict[str, dict[str, Union[str, list[int]]]], dict[str, str]]] ] @@ -17,6 +17,17 @@ class Workloads: + """ + A class that holds a collection of workloads that are used for a particular + benchmark type. + + It parses the benchmark type configuration for a workloads section, and + for each workload named therein it will create a workload object. + + set_executable() and set_benchmark_type() must be called on the Workloads + before attemting to use the run() method + """ + def __init__(self, benchmark_configuration: BENCHMARK_CONFIGURATION_TYPE, base_run_directory: str) -> None: self._benchmark_configuration: BENCHMARK_CONFIGURATION_TYPE = benchmark_configuration self._base_run_directory: str = base_run_directory @@ -37,13 +48,14 @@ def exist(self) -> bool: Can be used to check if we want to run a workload-style test run or a normal style test run """ - return self._workloads != [] + return bool(self._workloads) def run(self) -> None: """ - Run each workload in turn + Run all the I/O exerciser commands for each workload in turn, including + any scripts that should be run between workloads """ - if self._workloads == []: + if not self._workloads: log.error("No workloads to run %s", self._workloads) return From 677e09bc6b061de67cbb93b729e862612237fc93 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 29 May 2025 15:47:49 +0100 Subject: [PATCH 7/9] Updates for review comments Signed-off-by: Chris Harris --- .gitignore | 5 +- cli_options.py | 2 +- command/command.py | 5 -- command/fio_command.py | 51 +++++++++------- command/rbd_fio_command.py | 41 +++++++++++++ tests/test_cli_options.py | 120 +++++++++++++++++++++++++++++++++++++ workloads/workload.py | 80 +++++++++++++------------ workloads/workloads.py | 10 ++-- 8 files changed, 242 insertions(+), 72 deletions(-) create mode 100644 command/rbd_fio_command.py create mode 100644 tests/test_cli_options.py diff --git a/.gitignore b/.gitignore index 9ce46e5c..afd955ff 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ *.pyc *.pyo -*.venv \ No newline at end of file +*.venv +*.code-workspace +.devcontainer +pyproject.toml \ No newline at end of file diff --git a/cli_options.py b/cli_options.py index 3ffbe555..6f8ff335 100644 --- a/cli_options.py +++ b/cli_options.py @@ -14,7 +14,7 @@ class CliOptions(UserDict[str, Optional[str]]): """ Thic class encapsulates a set of CLI options that can be passed to a command line invocation. It is based on a python dictionary, but with - behaviour modified so that duplicate entries do not update the original. + behaviour modified so that duplicate keys do not update the original. """ def __setitem__(self, key: str, value: Optional[str]) -> None: diff --git a/command/command.py b/command/command.py index 9eb90184..49c2bc9f 100644 --- a/command/command.py +++ b/command/command.py @@ -92,8 +92,3 @@ def update_options(self, new_options: dict[str, str]) -> None: Update the command with the new_options dictionary """ self._options.update(new_options) - for key, value in new_options.items(): - if key not in self._options.keys(): - self._options[key] = value - else: - log.debug("key %s already exists. Not overwriting", key) diff --git a/command/fio_command.py b/command/fio_command.py index a217cbf1..7c9fadec 100644 --- a/command/fio_command.py +++ b/command/fio_command.py @@ -1,36 +1,47 @@ """ A class to deal with a command that will run a single instance of the -fio I/O exerciser +FIO I/O exerciser It will return the full executable string that can be used to run a -cli command using whatever method the Benchmark chooses +cli command using whatever method the calling Benchmark chooses. + +It deals with the FIO options that are common to all I/O engine types. For +options that are specific to a particular I/O engine e.g. rbd a subclass +should be created that parses these options """ +from abc import ABCMeta, abstractmethod from logging import Logger, getLogger from typing import Optional from cli_options import CliOptions from command.command import Command -from common import get_fqdn_cmd log: Logger = getLogger("cbt") -class FioCommand(Command): +class FioCommand(Command, metaclass=ABCMeta): """ - The fio command class. This class represents a single fio command + The FIO command class. This class represents a single FIO command line that can be run on a local or remote client system. """ - REQUIRED_OPTIONS = {"ioengine": "rbd", "clientname": "admin", "invalidate": "0", "direct": "1"} - DIRECT_TRANSLATIONS: list[str] = ["numjobs", "iodepth"] + _REQUIRED_OPTIONS = {"invalidate": "0", "direct": "1"} + _DIRECT_TRANSLATIONS: list[str] = ["numjobs", "iodepth"] def __init__(self, options: dict[str, str], workload_output_directory: str) -> None: - self._volume_number: int = int(options["volume_number"]) + self._target_number: int = int(options["target_number"]) self._total_iodepth: Optional[str] = options.get("total_iodepth", None) self._workload_output_directory: str = workload_output_directory super().__init__(options) + @abstractmethod + def _parse_ioengine_specific_parameters(self, options: dict[str, str]) -> dict[str, str]: + """ + Get any options that are specific to the I/O engine being used + for this fio run and add them to the CliOptons for this workload + """ + def _parse_global_options(self, options: dict[str, str]) -> CliOptions: global_options: CliOptions = CliOptions(options) @@ -39,13 +50,14 @@ def _parse_global_options(self, options: dict[str, str]) -> CliOptions: def _parse_options(self, options: dict[str, str]) -> CliOptions: fio_cli_options: CliOptions = CliOptions() - fio_cli_options.update(self.REQUIRED_OPTIONS) - for option in self.DIRECT_TRANSLATIONS: + fio_cli_options.update(self._parse_ioengine_specific_parameters(options)) + fio_cli_options.update(self._REQUIRED_OPTIONS) + for option in self._DIRECT_TRANSLATIONS: fio_cli_options[option] = options[option] if option in options.keys() else "" fio_cli_options["rw"] = options.get("mode", "write") fio_cli_options["output-format"] = options.get("fio_out_format", "json,normal") - fio_cli_options["pool"] = options.get("poolname", "cbt-librbdfio") + fio_cli_options["numjobs"] = options.get("numjobs", "1") fio_cli_options["bs"] = options.get("op_size", "4194304") fio_cli_options["end_fsync"] = f"{options.get('end_fsync', '0')}" @@ -84,11 +96,6 @@ def _parse_options(self, options: dict[str, str]) -> CliOptions: fio_cli_options["rwmixread"] = read_percent fio_cli_options["rwmixwrite"] = write_percent - rbd_name: str = options.get("rbdname", "") - if rbd_name == "": - rbd_name = f"cbt-fio-`{get_fqdn_cmd()}`-{self._volume_number:d}" # type: ignore[no-untyped-call] - fio_cli_options["rbdname"] = rbd_name - if bool(options.get("log_iops", True)): fio_cli_options["log_iops"] = "" @@ -107,7 +114,7 @@ def _parse_options(self, options: dict[str, str]) -> CliOptions: def _generate_full_command(self) -> str: command: str = "" - output_file: str = f"{self._generate_output_directory_path()}/output.{self._volume_number:d}" + output_file: str = f"{self._generate_output_directory_path()}/output.{self._target_number:d}" self._setup_logging(output_file) if "sudo" in self._options.keys(): @@ -131,7 +138,7 @@ def _generate_full_command(self) -> str: def _generate_output_directory_path(self) -> str: """ - For an fio command the output format is: + For an FIO command the output format is: numjobs-/total_iodepth-/iodepth- if total_iodepth was used in the options, otherwise: numjobs-/iodepth- @@ -141,16 +148,16 @@ def _generate_output_directory_path(self) -> str: if self._total_iodepth is not None: output_path += f"total_iodepth-{self._total_iodepth}/" - output_path += f"iodepth-{int(str(self._options['iodepth'])):03d}" + output_path += f"iodepth-{int(str(self._options['iodepth'])):06d}" return output_path def _get_job_name(self, parent_workload_name: str, processes_per_volume: int) -> str: """ - Get the name for this job to give to fio + Get the name for this job to give to FIO This is of the format: - cbt--- + cbt--- """ job_name: str = "" @@ -162,7 +169,7 @@ def _get_job_name(self, parent_workload_name: str, processes_per_volume: int) -> def _setup_logging(self, output_file_name: str) -> None: """ - Set up the additional fio log paths if required + Set up the additional FIO log paths if required """ if "log_iops" in self._options.keys(): self._options.pop("log_iops") diff --git a/command/rbd_fio_command.py b/command/rbd_fio_command.py new file mode 100644 index 00000000..b17a9e9a --- /dev/null +++ b/command/rbd_fio_command.py @@ -0,0 +1,41 @@ +''' +A subclasss of an FioCommand that deals with options that are specific the the +rbd I/O engine. + +From the FIO documantation: +https://fio.readthedocs.io/en/latest/fio_doc.html + +These are: +clientname +rbdname +clustername +pool +busy_poll + +Of these clustername and busy_poll are not currently used by CBT +''' + +from command.fio_command import FioCommand +from common import get_fqdn_cmd + + +class RbdFioCommand(FioCommand): + """ + An FioCommand type that deals specifically with running I/O using the rbd io engine. + """ + + _RBD_DEFAULT_OPTIONS: dict[str, str] = {"ioengine": "rbd", "clientname": "admin"} + + def __init__(self, options: dict[str, str], workload_output_directory: str) -> None: + super().__init__(options, workload_output_directory) + + def _parse_ioengine_specific_parameters(self, options: dict[str, str]) -> dict[str, str]: + rbd_options: dict[str, str] = self._RBD_DEFAULT_OPTIONS + + rbd_name: str = options.get("rbdname", "") + if rbd_name == "": + rbd_name = f"cbt-fio-`{get_fqdn_cmd()}`-{self._target_number:d}" # type: ignore[no-untyped-call] + rbd_options["rbdname"] = rbd_name + rbd_options["pool"] = options.get("poolname", "cbt-rbdfio") + + return rbd_options diff --git a/tests/test_cli_options.py b/tests/test_cli_options.py new file mode 100644 index 00000000..1858ee2f --- /dev/null +++ b/tests/test_cli_options.py @@ -0,0 +1,120 @@ +import unittest +from typing import Optional + +import pytest + +from cli_options import CliOptions + + +class TestCliOptions(unittest.TestCase): + """ + Tests for the cli_options class + """ + + DEFAULT_DATA: dict[str, str] = {"iodepth": "12", "mode": "randrw"} + DEFAULT_NEW_DATA: dict[str, str] = {"new": "value"} + + def _init_cli_options(self, options: Optional[dict[str, str]] = DEFAULT_DATA) -> CliOptions: + """ + Create an initial CliOptions class to test against + """ + return CliOptions(options) + + def _assert_cli_options_are_equal(self, actual_options: CliOptions, expected_options: CliOptions) -> None: + """ + Validate that the expected and actual values are equal + """ + # pytests assertEqual will check dictionary or list equality, including key + # names and values. + self.assertEqual(actual_options.keys(), expected_options.keys()) + self.assertEqual(len(actual_options), len(expected_options)) + self.assertEqual(actual_options, expected_options) + + def _test_update(self, update_data: dict[str, str], expected_options: dict[str, str]) -> None: + """ + Common code for testing the update function + """ + actual_options = self._init_cli_options() + actual_options.update(update_data) + self._assert_cli_options_are_equal(actual_options, self._init_cli_options(expected_options)) + + def _test_add(self, key_to_add: str, value_to_add: str, expected_options: dict[str, str]) -> None: + """ + Common code for testing the add function + """ + actual_options = self._init_cli_options() + actual_options[key_to_add] = value_to_add + self._assert_cli_options_are_equal(actual_options, self._init_cli_options(expected_options)) + + def test_update_new_value(self) -> None: + """ + Test updating the CliOptions with a new value + """ + expected_options: dict[str, str] = self.DEFAULT_DATA | self.DEFAULT_NEW_DATA + + self._test_update(self.DEFAULT_NEW_DATA, expected_options) + + def test_update_value_already_exists(self) -> None: + """ + Validate that the values in the CliOptions are not overwritten + when a set of new values is passed + """ + expected_options: dict[str, str] = self.DEFAULT_DATA + update_data: dict[str, str] = {"iodepth": "22"} + + self._test_update(update_data, expected_options) + + def test_add_new_value(self) -> None: + """ + Validate adding a key/value pair to the CliOptions works + """ + key: str = "added" + value: str = "value" + expected_options: dict[str, str] = {key: value} + expected_options.update(self.DEFAULT_DATA) + self._test_add(key, value, expected_options) + + def test_add_value_already_exists(self) -> None: + """ + Validate adding a key that already exists in the CliOptions + does not update the existing value for that key + """ + key: str = "mode" + value: str = "bob" + expected_options: dict[str, str] = self.DEFAULT_DATA + self._test_add(key, value, expected_options) + + def test_get_item_that_exists(self) -> None: + """ + Validate that getting an item from the CliOptions that exists + returns the correct value + """ + actual_options = self._init_cli_options() + try: + test_value: Optional[str] = actual_options["iodepth"] + self.assertEqual(test_value, self.DEFAULT_DATA["iodepth"]) + except KeyError: + pytest.fail("KeyError exception raised!") + + def test_get_item_not_exist(self) -> None: + """ + Validate that a KeyError exception is not thrown and None is returned + when asking for the value of a key that doesn't exist. + """ + actual_options = self._init_cli_options() + try: + test_value: Optional[str] = actual_options["bob"] + self.assertIsNone(test_value) + except KeyError: + pytest.fail("KeyError exception raised!") + + def test_clear(self) -> None: + """ + Validate that the clear method removes all options from CliOptions + """ + actual_options = self._init_cli_options() + self.assertEqual(actual_options, CliOptions(self.DEFAULT_DATA)) + actual_options.clear() + self.assertEqual(actual_options, {}) + actual_options.update(self.DEFAULT_NEW_DATA) + self.assertEqual(actual_options, CliOptions(self.DEFAULT_NEW_DATA)) diff --git a/workloads/workload.py b/workloads/workload.py index b152c4d5..869cea5a 100644 --- a/workloads/workload.py +++ b/workloads/workload.py @@ -8,7 +8,7 @@ from benchmarkfactory import all_configs # pyright: ignore[reportUnknownVariableType] from command.command import Command -from command.fio_command import FioCommand +from command.rbd_fio_command import RbdFioCommand WORKLOAD_TYPE = dict[str, Union[str, list[str]]] # pylint: disable=["invalid-name"] WORKLOAD_YAML_TYPE = dict[str, WORKLOAD_TYPE] # pylint: disable=["invalid-name"] @@ -75,19 +75,21 @@ def get_name(self) -> str: def has_script(self) -> bool: """ - True if there is a script for this workload, otherwise false + The test plan .yaml can specify a pre_workload_script that is to be run + before every workload (see https://github.com/ceph/cbt/pull/329). + + We need to know if the user has specified a script to run before this + particular workload. + return True of there is a script, otherwise False """ return self._script != "" def get_script_command(self) -> Optional[str]: """ - If the yaml specifies a script to be run before this workload then - return the command line invocation, otherwise return None + If the test plan yaml specifies a script to be run before this workload + then return the command line invocation, otherwise return None """ - if self._script == "": - return None - - return self._script + return self._script or None def set_executable(self, executable_path: str) -> None: """ @@ -118,8 +120,8 @@ def _create_command_class(self, options: dict[str, str]) -> Command: """ Create the concrete command classes for each command for this workload """ - if self._parent_benchmark_type == "fio": - return FioCommand(options, f"{self._base_run_directory}/{self._name}") + if self._parent_benchmark_type == "rbdfio": + return RbdFioCommand(options, f"{self._base_run_directory}/{self._name}") log.error("Benchmark Class %s is not supported by workloads yet", self._parent_benchmark_type) raise NotImplementedError @@ -131,14 +133,14 @@ def _create_commands_from_options(self) -> None: unique_options["iodepth_key"] = iodepth_key iodepth: int = int(unique_options.get(iodepth_key, 16)) number_of_volumes: int = int(unique_options.get("volumes_per_client", 1)) - iodepth_per_volume: dict[int, int] = self._calculate_iodepth_per_volume( + iodepth_per_target: dict[int, int] = self._calculate_iodepth_per_target( number_of_volumes, iodepth, iodepth_key ) unique_options["name"] = self._name - for volume_number, iodepth in iodepth_per_volume.items(): + for target_number, iodepth in iodepth_per_target.items(): unique_options["iodepth"] = f"{iodepth}" - unique_options["volume_number"] = f"{volume_number}" + unique_options["target_number"] = f"{target_number}" self._commands.append(self._create_command_class(unique_options)) # The above will overwrite the iodepth to be used for the command, @@ -158,63 +160,63 @@ def _get_iodepth_key(self, configuration_keys: list[str]) -> str: return iodepth_key - def _calculate_iodepth_per_volume(self, number_of_volumes: int, iodepth: int, iodepth_key: str) -> dict[int, int]: + def _calculate_iodepth_per_target(self, number_of_targets: int, iodepth: int, iodepth_key: str) -> dict[int, int]: """ - Calculate the desired iodepth per volume for a single benchmark run. - If total_iodepth is to be used calculate what the iodepth per volume + Calculate the desired iodepth per target for a single benchmark run. + If total_iodepth is to be used calculate what the iodepth per target should be and return that, otherwise return the iodepth value for each - volume + target """ if iodepth_key == "total_iodepth": - return self._calculate_iodepth_per_volume_from_total_iodepth(number_of_volumes, iodepth) + return self._calculate_iodepth_per_target_from_total_iodepth(number_of_targets, iodepth) else: - return self._set_iodepth_for_every_volume(number_of_volumes, iodepth) + return self._set_iodepth_for_every_target(number_of_targets, iodepth) - def _calculate_iodepth_per_volume_from_total_iodepth( - self, number_of_volumes: int, total_desired_iodepth: int + def _calculate_iodepth_per_target_from_total_iodepth( + self, number_of_targets: int, total_desired_iodepth: int ) -> dict[int, int]: """ - Given the total desired iodepth and the number of volumes from the - configuration yaml file, calculate the iodepth for each volume + Given the total desired iodepth and the number of targets from the + configuration yaml file, calculate the iodepth for each target If the iodepth specified in total_iodepth is too small to allow - an iodepth of 1 per volume, then reduce the number of volumes + an iodepth of 1 per target, then reduce the number of targets used to allow an iodepth of 1 per volume. """ queue_depths: dict[int, int] = {} - if number_of_volumes > total_desired_iodepth: + if number_of_targets > total_desired_iodepth: log.warning( - "The total iodepth requested: %s is less than 1 per volume (%s)", + "The total iodepth requested: %s is less than 1 per target (%s)", total_desired_iodepth, - number_of_volumes, + number_of_targets, ) log.warning( - "Number of volumes per client will be reduced from %s to %s", number_of_volumes, total_desired_iodepth + "Number of volumes per client will be reduced from %s to %s", number_of_targets, total_desired_iodepth ) - number_of_volumes = total_desired_iodepth + number_of_targets = total_desired_iodepth - iodepth_per_volume: int = total_desired_iodepth // number_of_volumes - remainder: int = total_desired_iodepth % number_of_volumes + iodepth_per_target: int = total_desired_iodepth // number_of_targets + remainder: int = total_desired_iodepth % number_of_targets - for volume_id in range(number_of_volumes): - iodepth: int = iodepth_per_volume + for target_id in range(number_of_targets): + iodepth: int = iodepth_per_target if remainder > 0: iodepth += 1 remainder -= 1 - queue_depths[volume_id] = iodepth + queue_depths[target_id] = iodepth return queue_depths - def _set_iodepth_for_every_volume(self, number_of_volumes: int, iodepth: int) -> dict[int, int]: + def _set_iodepth_for_every_target(self, number_of_targets: int, iodepth: int) -> dict[int, int]: """ - Given an iodepth value and the number of volumes return a dictionary - that contains the desired iodepth value for each volume + Given an iodepth value and the number of targets return a dictionary + that contains the desired iodepth value for each target """ queue_depths: dict[int, int] = {} - for volume_id in range(number_of_volumes): - queue_depths[volume_id] = iodepth + for target_id in range(number_of_targets): + queue_depths[target_id] = iodepth return queue_depths diff --git a/workloads/workloads.py b/workloads/workloads.py index cbb41a57..d3a26d45 100644 --- a/workloads/workloads.py +++ b/workloads/workloads.py @@ -96,7 +96,7 @@ def get_names(self) -> str: def set_benchmark_type(self, benchmark_type: str) -> None: """ - set the benchmark type that will be used to run the workload + set the benchmark type that will be used to run the workloads This must be done by the benchmark before it attempts to run any commands @@ -105,7 +105,9 @@ def set_benchmark_type(self, benchmark_type: str) -> None: def set_executable(self, executable_path: str) -> None: """ - Set the executable to be used for the + Set the executable to be used for this set of workloads. + + This must be set by the parent benchmark before calling the run() method """ self._executable = executable_path @@ -122,7 +124,7 @@ def _create_configurations(self, workload_json: WORKLOAD_YAML_TYPE) -> None: def _get_global_options_from_configuration(self, configuration: BENCHMARK_CONFIGURATION_TYPE) -> WORKLOAD_TYPE: """ - Get any configuration options from the yaml that are not workload + Get any configuration options from the test plan .yaml that are not workload specific """ global_options: WORKLOAD_TYPE = {} @@ -131,7 +133,7 @@ def _get_global_options_from_configuration(self, configuration: BENCHMARK_CONFIG if option_name == "workloads" or option_name == "prefill": # prefill is not an option for workloads as it is used in the Benchmark prefill() # method. - # Workloads we also want to ignore + # workloads we also want to ignore as these will be dealt with at a later date pass elif isinstance(value, list): global_options[option_name] = value From 3092147b895d2260f9b78735455e3d3bb01b96e6 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 29 May 2025 15:47:49 +0100 Subject: [PATCH 8/9] Updates for review comments Signed-off-by: Chris Harris --- benchmarkfactory.py | 30 ------------------------------ common.py | 28 ++++++++++++++++++++++++++++ workloads/workload.py | 2 +- 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/benchmarkfactory.py b/benchmarkfactory.py index f9bdae5b..9cadb91c 100644 --- a/benchmarkfactory.py +++ b/benchmarkfactory.py @@ -1,6 +1,3 @@ -import copy -import itertools - import settings from benchmark.radosbench import Radosbench from benchmark.fio import Fio @@ -22,33 +19,6 @@ def get_all(archive, cluster, iteration): current.update(default) yield get_object(archive, cluster, benchmark, current) - -def all_configs(config): - """ - return all parameter combinations for config - config: dict - list of params - iterate over all top-level lists in config - """ - cycle_over_lists = [] - cycle_over_names = [] - default = {} - - for param, value in list(config.items()): - # acceptable applies to benchmark as a whole, no need to it to - # the set for permutation - if param == 'acceptable': - default[param] = value - elif isinstance(value, list): - cycle_over_lists.append(value) - cycle_over_names.append(param) - else: - default[param] = value - - for permutation in itertools.product(*cycle_over_lists): - current = copy.deepcopy(default) - current.update(list(zip(cycle_over_names, permutation))) - yield current - def get_object(archive, cluster, benchmark, bconfig): benchmarks = { 'nullbench': Nullbench, diff --git a/common.py b/common.py index 67becabc..f7062abf 100644 --- a/common.py +++ b/common.py @@ -1,7 +1,9 @@ """ Common classes to wrap around pdsh (parallel shell) """ +import copy import errno +import itertools import logging import os import signal @@ -12,6 +14,32 @@ logger = logging.getLogger("cbt") +def all_configs(config): + """ + return all parameter combinations for config + config: dict - list of params + iterate over all top-level lists in config + """ + cycle_over_lists = [] + cycle_over_names = [] + default = {} + + for param, value in list(config.items()): + # acceptable applies to benchmark as a whole, no need to it to + # the set for permutation + if param == "acceptable": + default[param] = value + elif isinstance(value, list): + cycle_over_lists.append(value) + cycle_over_names.append(param) + else: + default[param] = value + + for permutation in itertools.product(*cycle_over_lists): + current = copy.deepcopy(default) + current.update(list(zip(cycle_over_names, permutation))) + yield current + class Localhost(object): """ This class encapsulates a single dictionary with the information of the localhost diff --git a/workloads/workload.py b/workloads/workload.py index 869cea5a..618252a5 100644 --- a/workloads/workload.py +++ b/workloads/workload.py @@ -6,7 +6,7 @@ from logging import Logger, getLogger from typing import Generator, Optional, Union -from benchmarkfactory import all_configs # pyright: ignore[reportUnknownVariableType] +from common import all_configs # pyright: ignore[reportUnknownVariableType] from command.command import Command from command.rbd_fio_command import RbdFioCommand From 23c481d7e4f2bf60613f4ff69db30d9a7c0b9326 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Thu, 29 May 2025 15:47:49 +0100 Subject: [PATCH 9/9] Updates for review comments Signed-off-by: Chris Harris --- benchmarkfactory.py | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmarkfactory.py b/benchmarkfactory.py index 9cadb91c..3b603c45 100644 --- a/benchmarkfactory.py +++ b/benchmarkfactory.py @@ -1,4 +1,5 @@ import settings +from common import all_configs from benchmark.radosbench import Radosbench from benchmark.fio import Fio from benchmark.hsbench import Hsbench