diff --git a/.gitignore b/.gitignore index 9ea8df8..4cbde61 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ -python_runs/__pycache__ -python_runs/IOR_config +__pycache__ examples/test_files logs tmp diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f8ab174 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright (c) 2025 Shaker Krit, Flatiron Institute + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index 3761e8c..5c42b71 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,26 @@ # PyBenchFramework -#throw this change out -This application is being used with slurm but it can run on its own. - -User must set the environment variable "PyBench_root_dir" to run run.py. Try to use mdtest_env to start the environment for MDTEST or env_start for FIO. Don't use both at the same time. - -The app needs a root directory to read templates and store intermediate config files as well as log files and the like. - -Options can be included as arguments to 'run.py' or as fields in a YAML config file. The config file for an FIO job looks like so: - -
+This application is designed to be used with `slurm` but it can run on its own. + +## Installation (development) +```sh +python3 -m venv /path/to/venv +source /path/to/venv/bin/activate +git clone https://github.com/flatironinstitute/PyBenchFramework --branch cleanup +cd PyBenchFramework +pip install -e . +rehash # might not be necessary, depending on shell +``` + +## Running +```sh +source /path/to/venv/bin/activate +mpirun -n 2 pybench-run --benchmark "fio-independent-ranks" --slurm-job-number 0 --io-type "read" --config examples/YAML_templates/FIO_templates/test_kernel_rep3_ssd.yml --block-size 4M --first-node $(hostname -s) --total-node-count 1 +``` +Options can be included as arguments to 'run.py' or as fields in a YAML config file. The config +file for an FIO job looks like so: + +### Configuration +```yaml slurm_job_number: block_size: "4M,64K,4K" directory: /mnt/cephtest-fi5k/test-rep3-ssd/skrit/fio @@ -22,11 +34,11 @@ hosts_file: no_scrub: 0 unit_restart: 1 template_path: /mnt/home/skrit/Documents/testing_clones/clone1/PyBenchFramework/examples/template/template.fio -+``` The config file for an mdtest job looks like so: -
+```yaml mpi_ranks: 40,30,20,10,5 directory: /mnt/cephtestk/test-ec63/skrit/mdtest files_per_rank: 20,10,5 @@ -38,10 +50,7 @@ offset: 1 write_data: '3901' read_data: '3901' node_count: 10,5,1 -- -One thing to note is that the main loop takes mpi ranks and multiplies it to the node count. For example, when mpi_ranks is 5 and node_count is 1, mpirun is given '-n 5' for a total of 5 mpi ranks. When mpi_ranks is 5 and node_count is 10, mpirun is given '-n 50' (5 * 10) which is distributed across 10 nodes (--map-by node -N 10). - -'slurm_job_number' should, at least for now, be an inline argument. Keep in mind inline arguments are likely separated by - rather than _ --help should show all arguments as they should be inline. +``` -Under the 'submit_scripts' folder, you'll see several slurm scripts. 'mdtest.sh' is a good template for submitting mdtest jobs and the scripts starting with 'ssd...' or 'hdd...' are good templates for FIO jobs. +## Slurm submission +All `submit_scripts` are currently broken. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..619060f --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,39 @@ +[build-system] +requires = ["hatchling >= 1.26"] +build-backend = "hatchling.build" + +[project] +name = "BenchmarkToolkit" +version = "0.0.1" +authors = [ + { name="Shaker Krit", email="skrit@flatironinstitute.org" }, +] +description = "BenchmarkToolkit is a Python library designed to simplify and streamline benchmarking tasks. This library provides a high-level abstraction over various benchmarking functions, allowing developers to easily perform performance testing and analysis with minimal setup." +readme = "README.md" +requires-python = ">=3.9" +classifiers = [ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", +] +license = "MIT" +license-files = ["LICENSE"] +dependencies = [ + "cryptography", # FIXME was: 42.0.8 + "matplotlib", # FIXME was: 3.9.2 + "pandas", # FIXME was: 2.2.2 + "paramiko", # FIXME was: 3.4.0 + "psutil", # FIXME was: 6.0.0 + "PyYAML", + "mpi4py", +] + +[tool.hatch.build.targets.wheel] +packages = [ + "src/BenchmarkToolkit", +] + +[project.urls] +Repository = "https://github.com/flatironinstitute/PyBenchFramework" + +[project.scripts] +pybench-run = "BenchmarkToolkit.run:main" diff --git a/python_runs/IOR_wrapper.py b/python_runs/IOR_wrapper.py deleted file mode 100644 index 4e4744d..0000000 --- a/python_runs/IOR_wrapper.py +++ /dev/null @@ -1,113 +0,0 @@ -import os -import socket -import handler_class -from datetime import datetime -import json -import sys -import benchmark_tools -import args_handler -import miscellaneous -import network_collect -import threading -import time -import re -import shutil - -def wrap_IOR(args, PyBench_root_dir): - - job_number = args['slurm_job_number'] - - current_dir = os.getcwd() - - mdtest_obj_dict = {} - #handler_class.mdtestTool() - - log_dir = f"{PyBench_root_dir}/results/iortest/{args['io_type']}/{args['platform_type']}/{job_number}" - command_log_dir = f"{log_dir}/commands" - tmp_log_dir = f"{log_dir}/tmp_files" - - hostname = socket.gethostname() - - mpi_ranks = list(benchmark_tools.split_arg_sequence(args['mpi_ranks'], "--mpi-ranks")) - filename = args['testFile'] - node_count = list(benchmark_tools.split_arg_sequence(args['node_count'], "--node-count")) - block_size = args['block_size'] - transfer_size = args['transfer_size'] - segment_count = args['segment_count'] - reorder_tasks = args['reorder_tasks'] - fsync = args['fsync'] - #if 'output_file' in args.keys(): - # output_file = args['output_file'] - if 'output_format' in args.keys(): - output_format = args['output_format'] - if 'deadline_for_stonewalling' in args.keys(): - deadline_for_stonewalling = args['deadline_for_stonewalling'] - else: - deadline_for_stonewalling = 0 - - - #total_ranks = mpi_ranks[0]*node_count[0] - - #output_file = f"{log_dir}/ranks_per_node_{mpi_ranks[0]}_node_count_{node_count[0]}" - io_type = args['io_type'] - if 'use_existing_file' in args.keys(): - use_existing_file=args['use_existing_file'] - else: - use_existing_file = False - - if 'job_note' in args.keys(): - with open(f"{log_dir}/job_note.txt", 'w') as file: - file.write(args['job_note']) - - ior_obj_dict = {} - - print (f"SEQUENCES ARE {mpi_ranks} and nodes {node_count}") - for nodes in node_count: - for ranks in mpi_ranks: - print (f"BEGINNING OF LOOPS _---------------------- {ranks} and nodes {nodes}") - if 'unit_restart' in args: - if args['unit_restart'] == 1: - pattern = '/' - split_dir = re.split(pattern, filename) - cephtest_root = '/'+split_dir[1]+'/'+split_dir[2] - miscellaneous.restart_ceph_unit(cephtest_root) - - total_ranks = ranks * nodes - output_file = f"{log_dir}/ranks_per_node_{ranks}_node_count_{nodes}" - - ior_obj_dict[f"{ranks}_{nodes}"] = handler_class.newIORTool() - ior_obj_dict[f"{ranks}_{nodes}"].setup_command(config_file=f"{PyBench_root_dir}/{args['config']}",mpi_ranks=total_ranks,filename=filename,ranks_per_node=ranks,block_size=block_size,transfer_size=transfer_size,segment_count=segment_count,reorder_tasks=1,fsync=1,output_file=f"{output_file}",output_format=output_format, deadline_for_stonewalling=deadline_for_stonewalling,io_type=io_type,use_existing_file=use_existing_file) - - with open(f"{command_log_dir}/command_ior_{ranks}_{nodes}", 'w') as file: - file.write(f"The following is the ior command") - tmp_cmd_string = "" - for cmd_el in ior_obj_dict[f"{ranks}_{nodes}"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - ior_obj_dict[f"{ranks}_{nodes}"].run() - - for nodes in node_count: - for ranks in mpi_ranks: - #Output handling - output_file = f"{log_dir}/ranks_per_node_{ranks}_node_count_{nodes}" - combined_json_log_file = f"{log_dir}/combined_{ranks}_{nodes}_{block_size}" - json_log_file = output_file - - if os.path.exists(json_log_file): - bw, iops = miscellaneous.load_ior_json_results(json_log_file, log_dir) - else: - print( f"JSON LOG FILE NOT FOUND! ------- {output_file}" ) - sys.exit() - - data = { - "nodes": nodes, - "processors": ranks, - "bw": bw, - "iops": iops - } - - with open(combined_json_log_file, 'w') as json_file: - json.dump(data, json_file, indent=4) - print(f"Data successfully written to {combined_json_log_file}") - diff --git a/python_runs/analyze_and_rebalance_load.py b/python_runs/analyze_and_rebalance_load.py deleted file mode 100644 index 55594c7..0000000 --- a/python_runs/analyze_and_rebalance_load.py +++ /dev/null @@ -1,7 +0,0 @@ -import os, sys - -def log_and_analyze_data_points(log_dir, fio_object): - - #Let's try rate-limiting to see how that effects start and end times. - print(log_dir) - print(fio_object) diff --git a/python_runs/args_handler.py b/python_runs/args_handler.py deleted file mode 100644 index 5087a86..0000000 --- a/python_runs/args_handler.py +++ /dev/null @@ -1,95 +0,0 @@ -import argparse -import yaml -import sys - -#slurm job number -# triple replicated vs EC63 - FIO directory option -# block size? Multiple or single, either way need to decide which to test -# number of jobs? Not as an argument. At least not yet -# job length maybe -# Sequential vs random - -def handle_arguments(): - parser = argparse.ArgumentParser(description="This script wraps FIO and facilitates long-running variable testing on an FS.") - - #universal - parser.add_argument('--config', type=str, help="Path to the YAML config file.") - parser.add_argument('--slurm-job-number', type=int, help="Slurm job number this script is running under") - parser.add_argument('--directory', type=str, help="Directory to run the test in. This is where the test files will be created.") - parser.add_argument('--first-node', type=str, help="The first node in the node list. Will execute some preperatory steps on this node") - parser.add_argument('--benchmark', type=str, help="The benchmark you want to run.") - parser.add_argument('--interface-name', type=str, help="The interface you want to monitor for inbound and outbound counters") - parser.add_argument('--total-node-count', type=str, help="The total count of nodes in the job") - parser.add_argument('--unit-restart', type=bool, help="Restart systemd unit (assumably ceph)") - parser.add_argument('--node-count', type=str, help="Sequence of nodes that the benchmark should run with. e.g '1,2,4,6,8,10'") - parser.add_argument('--job-note', type=str, help="insert a note for the job") - parser.add_argument('--wait-for-others', type=bool, help="True if nodes should wait for each other to finish iterations, false if not (1 or 0)") - parser.add_argument('--in-parts', type=bool, help="True if the sequences of benchmark arguments should be run iteratively. This usually means there will be multiple log files which will need to be taken into account in the parsing & plotting steps.") - - #ior portion - parser.add_argument('--testFile', type=str, help="File/directory to run the IOR test suite on") - parser.add_argument('--transfer-size',type=str, help="transfer size") - parser.add_argument('--segment-count', type=str, help="segment count") - parser.add_argument('--reorder-tasks', type=str, help="reorder tasks") - parser.add_argument('--fsync', type=str, help="fsync") - parser.add_argument('--output-file', type=str, help="output file") - parser.add_argument('--output-format', type=str, help="output format") - parser.add_argument('--deadline-for-stonewalling', type=int, help="Run IOR in timed mode instead of an indefinite time. All ranks stop at the same time.") - parser.add_argument('--use-existing-file', type=bool, help="Use existing test file") - - #mdtest portion - parser.add_argument('--mpi-ranks', type=str, help="Number of MPI ranks per node to use") - parser.add_argument('--files-per-rank', type=str, help="Number of files to create per rank (mdtest)") - parser.add_argument('--test-repetition', type=str, help="Number of times to repeat each test (mdtest)") - parser.add_argument('--offset', type=str, help="Should there be a node offset? (if yes, 1, else ommit flag) (mdtest)") - parser.add_argument('--write-data', type=str, help="Should mdtest write data into the files? Either 0 for no or a number of bytes (mdtest)") - parser.add_argument('--read-data', type=str, help="Should mdtest read data from the files? Either 0 for no or a number of bytes (mdtest)") - parser.add_argument('--timed', type=str, help="Specify the lower bound and upper bound of the time that the test should run for. Avoid values too close together. Units are seconds.") - - #fio portion - parser.add_argument('--file-size', type=str, help="Specify the size of the file FIO should write out (per process)") - parser.add_argument('--block-size', type=str, help="Block size that FIO should read/write at.") - parser.add_argument('--job-number', type=str, help="Number or sequence of number of jobs per node that FIO should run. e.g '1,5,10,15'. This is per node count in --node-count") - parser.add_argument('--time', type=int, help="Number of seconds that FIO should run for.") - parser.add_argument('--io-type', type=str, help="write, read, randwrite, randread, among others. Which IO type should FIO issue?") - parser.add_argument('--platform-type', type=str, help="Which platform are we using? This will decide output file path as well.") - parser.add_argument('--split-hosts-file', type=bool, help="Should the wrapper split the original hosts file into subsections for the different iterations?") - parser.add_argument('--hosts-file', type=str, help="Path to the intial hosts file which contains all hosts (At least FIO servers) involved.") - parser.add_argument('--no-scrub', type=bool, help="(Ceph only) set noscrub and nodeepscrub flags on the ceph system. Requires passwordless SSH to the Ceph servers") - parser.add_argument('--template-path', type=str, help="The path to the FIO template") - - args = parser.parse_args() - args_dict = vars(args) - - config_dict = {} - - if args.config: - with open(args.config, 'r') as file: - config_dict = yaml.safe_load(file) - - # Merge config and inline arguments, giving precedence to inline arguments - merged_dict = {**config_dict, **{k: v for k, v in args_dict.items() if v is not None}} - - # Set defaults if not provided - merged_dict.setdefault('time', 300) - merged_dict.setdefault('no_scrub', 0) - merged_dict.setdefault('split_hosts_file', False) - merged_dict.setdefault('interface_name', '') - merged_dict.setdefault('write_data', '0') - merged_dict.setdefault('read_data', '0') - merged_dict.setdefault('wait_for_others', 1) - - # Check for required arguments - #Trying a run without a hosts file to see if independent runs work - #required_args = ['block_size', 'directory', 'io_type', 'platform_type', 'job_number', 'node_count', 'hosts_file', 'template_path'] - #required_args = ['block_size', 'directory', 'io_type', 'platform_type', 'job_number', 'node_count', 'template_path', 'benchmark'] - required_args = [ 'io_type', 'platform_type', 'benchmark'] - missing_args = [arg for arg in required_args if arg not in merged_dict or merged_dict[arg] is None] - - #print (f"{merged_dict['write_data']} {merged_dict['write_data']}") - if missing_args and not merged_dict["not_taken_into_account"]["in_parts"]: - print(f"Error: Missing required arguments: {', '.join(missing_args)}") - sys.exit(1) - - #print(merged_dict) - return merged_dict diff --git a/python_runs/independent_runs.py b/python_runs/independent_runs.py deleted file mode 100644 index 55fba35..0000000 --- a/python_runs/independent_runs.py +++ /dev/null @@ -1,230 +0,0 @@ -import os -import fcntl -import re -import socket -import handler_class -from datetime import datetime -import sys -import json -import benchmark_tools -import args_handler -import miscellaneous -#import network_counter_collection -from network_collect import network_counter_collection -import threading -import time -import mmap -import count_lines_in_uncombined -import multi_level_barrier -from mpi4py import MPI - -def serverless_fio(args, PyBench_root_dir): - #testing mpi - # Initialize MPI - comm = MPI.COMM_WORLD # Default communicator (all processes) - rank = comm.Get_rank() # Get the rank (ID) of this process - size = comm.Get_size() # Get the total number of processes - #finished mpi section - - def background_network_monitor(args, job_count, node_count, block_size, PyBench_root_dir): - print("network_monitoring") - network_counter_collection.monitor_traffic(args, job_count, node_count, block_size, PyBench_root_dir) - - job_number = args['slurm_job_number'] - total_node_count = int(args['total_node_count']) - - fio_ob_dict = {} - fio_out_dict = {} - - proc = list(benchmark_tools.split_arg_sequence(args['job_number'], '--job-number')) - nodes = list(benchmark_tools.split_arg_sequence(str(args['node_count']), '--node-count')) - block_sizes = benchmark_tools.split_block_size_sequence(args['block_size'], '--block-size') - #print(f"{block_sizes} type is {type(block_sizes)}") - - config_template_path = args['template_path'] - - with open(config_template_path, 'r') as file: - original_file_contents = file.read() - - log_dir = f"{PyBench_root_dir}/results/{args['io_type']}/{args['platform_type']}/{job_number}" - command_log_dir = f"{log_dir}/commands" - - #create a map between hostnames and generic indexed hostnames - miscellaneous.create_hostname_mapping(log_dir,rank) - - if 'job_note' in args.keys(): - job_note = f"{args['job_note']} {args['io_type']}" - with open(f"{log_dir}/job_note.txt", 'w') as file: - file.write(job_note) - - if args['file_size']: - pass - else: - print( "Must specify a file size for FIO to write out..." ) - sys.exit() - - hostname = socket.gethostname() - - #put this code into miscellaneous - node_split_file = f"{log_dir}/host_list" - miscellaneous.insert_entry_and_check_completion(node_split_file, hostname, total_node_count) - - #Is this still needed (probably not)? - my_line_num = miscellaneous.grep_string(node_split_file, hostname) - - #as job progesses iteration count increases - iteration_count = 0 - - for node_iter in nodes: - #TESTING MPI BARRIER - #replace this with if my rank + 1 <= node_iter - #if my_line_num <= node_iter: - #if rank <= node_iter - 1: - # Create a new communicator for the selected ranks - if rank <= node_iter - 1: - new_comm = comm.Split(color=1, key=rank) # Grouping ranks > some_count - else: - new_comm = comm.Split(color=MPI.UNDEFINED, key=rank) # Exclude ranks <= some_count - - if new_comm != MPI.COMM_NULL: - for block_size in block_sizes: - #print(f"This iteration's block size is: {block_size}") - for job_count in proc: - - file_count = job_count - - #Reset file contents for FIO config file - file_contents = miscellaneous.reset_file_contents(original_file_contents, args, job_count, block_size,log_dir) - fio_job_config = f"{PyBench_root_dir}/examples/test_files/{job_number}_{hostname}_{job_count}p_{file_count}f_{block_size}_{args['io_type']}.fio" - with open(fio_job_config, 'w') as file: - file.write(file_contents) - - fio_ob_name = f"{hostname}_{node_iter}_{job_count}p_{file_count}f_{block_size}_{args['io_type']}" - fio_ob_dict[fio_ob_name] = handler_class.FIOTool() - - fio_ob_dict[fio_ob_name].setup_command(config_file=fio_job_config, output_format="json", output_file=f"{log_dir}/{hostname}_{node_iter}_{job_count}p_{file_count}f_{block_size}.json") - - with open(f"{command_log_dir}/{job_number}_{hostname}_{node_iter}_{job_count}p_{file_count}f_{block_size}_{args['platform_type']}_command", 'a') as file: - file.write(f"num nodes is 1, job number is {job_count}") - tmp_cmd_string = "" - for cmd_el in fio_ob_dict[fio_ob_name].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - network_counter_collection.stop_thread = False - background_thread = threading.Thread(target=background_network_monitor, args=(args, job_count, node_iter, block_size, PyBench_root_dir)) - background_thread.start() - start_time = time.time() - #TESTING MPI BARRIER - # Synchronize all processes at the barrier - print(f"Process {rank} is reaching the barrier.") - new_comm.Barrier() # Wait for all processes to reach this point - - # Once the barrier is passed, all processes continue - #current_time = time.time() - print(f"Process {rank} has passed the barrier. {time.time()}") - - # Continue with the rest of the code after the barrier - - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] starting fio Job num: {job_count}, node count: {node_iter}, IO type {args['io_type']} {time.time()}") - fio_ob_dict[fio_ob_name].run() - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] stopping fio Job num: {job_count}, node count: {node_iter}, IO type {args['io_type']} {time.time()}") - network_counter_collection.stop_thread = True - background_thread.join() - end_time = time.time() - - elapsed_time = end_time - start_time - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Job num: {job_count}, node count: {node_iter}. Iteration is finished. {hostname} [s-{start_time}], [e-{end_time}, el-{elapsed_time}]") - - json_log_file = f"{log_dir}/{hostname}_{node_iter}_{job_count}p_{file_count}f_{block_size}.json" - uncombined_json_log_file = f"{log_dir}/uncombined_{node_iter}_{job_count}p_{block_size}.tmp" - first_barrier_file = f"{log_dir}/barrier_file_1_{iteration_count}.txt" - second_barrier_file = f"{log_dir}/barrier_file_2_{iteration_count}.txt" - - if 'unit_restart' in args: - if args['unit_restart'] == 1: - pattern = '/' - split_dir = re.split(pattern, args['directory']) - cephtest_root = '/'+split_dir[1]+'/'+split_dir[2] - miscellaneous.restart_ceph_unit(cephtest_root) - - #wait_res = 0 - #while wait_res == 0: - if os.path.exists(json_log_file): - bw, iops = miscellaneous.load_json_results(json_log_file) - - with open(uncombined_json_log_file, 'a') as file: - fcntl.flock(file, fcntl.LOCK_EX) # Lock the file for exclusive access - file.write(f"{hostname}, bw: {bw}, iops: {iops} \n") - fcntl.flock(file, fcntl.LOCK_UN) # Unlock the file after writing - else: - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] FIO JSON LOG FILE DOESN'T EXIST!!! - iteration {iteration_count}") - sys.exit() - - #This is wrong I think. Needs to change to a barrier outside of any iteration and before any log combination/modification. It still works because rank 0 is the only rank taking action after the iterations but... Either way it works... Just think about whether this is the spot to put it and about whether it's this communicator or the default communicator that should be used. - new_comm.Barrier() # Wait for all processes to reach this point - - #Probably remove this - ''' - if 'wait_for_others' in args.keys(): - # B) Barrier Phase=1: "Done with iteration i" - multi_level_barrier.barrier_phase(first_barrier_file, iteration_count, hostname, phase=1, node_count=node_iter) - - # C) Barrier Phase=2: "Ready for iteration i+1" - # This ensures that *every node* knows that every other node has finished iteration i. - multi_level_barrier.barrier_phase(second_barrier_file, iteration_count, hostname, phase=2, node_count=node_iter) - iteration_count += 1 - #BETWEEN BARRIER AND ORIGINAL FILE_BASED WAITING - if args['wait_for_others']: - wait_res = count_lines_in_uncombined.wait_until_line_count_is_node_count(uncombined_json_log_file, hostname, node_iter, 1000) - else: - wait_res = count_lines_in_uncombined.wait_until_line_count_is_node_count(uncombined_json_log_file, hostname, node_iter, 100) - ''' - #sys.stdout.flush() - print("Sleeping for 15 seconds...") - time.sleep(15) - - - #probably change this so that only rank 0 executes anything - for node_iter in nodes: - for block_size in block_sizes: - for job_count in proc: - file_count = job_count - json_log_file = f"{log_dir}/{hostname}_{node_iter}_{job_count}p_{file_count}f_{block_size}.json" - combined_json_log_file = f"{log_dir}/combined_{node_iter}_{job_count}p_{file_count}f_{block_size}.json" - uncombined_json_log_file = f"{log_dir}/uncombined_{node_iter}_{job_count}p_{block_size}.tmp" - if os.path.exists(json_log_file): - bw, iops = miscellaneous.load_json_results(json_log_file) - - if rank == 0: - bw_total = 0 - iops_total = 0 - - with open (uncombined_json_log_file, 'r') as file: - uncombined_dict = {} - for line in file: - parts = line.split(',') - bw = float(parts[1].split(':')[1].strip()) - iops = float(parts[2].split(':')[1].strip()) - bw_total += bw - iops_total += iops - uncombined_dict[parts[0]] = { - "node_bw": bw, - "node_iops": iops - } - - data = { - "nodes": node_iter, - "processors": job_count, - "bw": bw_total, - "iops": iops_total - } - data['node_list'] = {} - for key, value in uncombined_dict.items(): - data['node_list'][key] = value - - with open(combined_json_log_file, 'w') as json_file: - json.dump(data, json_file, indent=4) - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Data successfully written to {combined_json_log_file}") - - diff --git a/python_runs/independent_runs_independent_ranks.py b/python_runs/independent_runs_independent_ranks.py deleted file mode 100644 index c8858e6..0000000 --- a/python_runs/independent_runs_independent_ranks.py +++ /dev/null @@ -1,316 +0,0 @@ -import os -import fcntl -import re -import socket -import shutil -import handler_class -#from datetime import datetime -import datetime -import sys -import json -import benchmark_tools -import args_handler -import miscellaneous -#import network_counter_collection -from network_collect import network_counter_collection -import threading -import time -import mmap -import count_lines_in_uncombined -import multi_level_barrier -from mpi4py import MPI -from analyze_and_rebalance_load import * - -def independent_ranks(args, PyBench_root_dir): - #testing mpi - # Initialize MPI - comm = MPI.COMM_WORLD # Default communicator (all processes) - rank = comm.Get_rank() # Get the rank (ID) of this process - size = comm.Get_size() # Get the total number of processes - #finished mpi section - - def background_network_monitor(args, job_count, node_count, block_size, PyBench_root_dir): - print("network_monitoring") - network_counter_collection.monitor_traffic(args, job_count, node_count, block_size, PyBench_root_dir) - - job_number = args['slurm_job_number'] - total_node_count = int(args['total_node_count']) - - fio_ob_dict = {} - fio_out_dict = {} - - proc = list(benchmark_tools.split_arg_sequence(args['job_number'], '--job-number')) - nodes = list(benchmark_tools.split_arg_sequence(str(args['node_count']), '--node-count')) - block_sizes = benchmark_tools.split_block_size_sequence(args['block_size'], '--block-size') - #print(f"{block_sizes} type is {type(block_sizes)}") - - config_template_path = args['template_path'] - - with open(config_template_path, 'r') as file: - original_file_contents = file.read() - - log_dir = f"{PyBench_root_dir}/results/{args['io_type']}/{args['platform_type']}/{job_number}" - command_log_dir = f"{log_dir}/commands" - - today = datetime.date.today() - formatted = today.strftime("%m/%d/%Y") # e.g. "03/19/2025" - - if 'job_note' in args.keys(): - job_note = f"{args['job_note']} {args['io_type']} {formatted}" - with open(f"{log_dir}/job_note.txt", 'w') as file: - file.write(job_note) - - if args['file_size']: - pass - else: - print( "Must specify a file size for FIO to write out..." ) - sys.exit() - - hostname = socket.gethostname() - - #put this code into miscellaneous - node_split_file = f"{log_dir}/host_list" - miscellaneous.insert_entry_and_check_completion(node_split_file, hostname, total_node_count) - - #Is this still needed (probably not)? - my_line_num = miscellaneous.grep_string(node_split_file, hostname) - - #as job progesses iteration count increases - iteration_count = 0 - - # 1) Get this rank’s hostname - hostname = socket.gethostname() - - # 2) Gather all hostnames - all_hostnames = comm.allgather(hostname) - - # 3) Build a sorted list of unique hostnames - unique_hosts = sorted(set(all_hostnames)) - - # 4) Create a dict mapping each hostname to a 0-based index - host_to_index = {host: i for i, host in enumerate(unique_hosts)} - - # Assign our node index to my_node_count - my_node_count = host_to_index[hostname] + 1 - #my_node_count = int(os.environ["OMPI_COMM_WORLD_NODE_RANK"]) + 1 - local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]) + 1 - - #create a map between hostnames and generic indexed hostnames - if local_rank == 1: - miscellaneous.create_hostname_mapping(log_dir,my_node_count) - - # copy YAML config file to output directory for extra logging - if local_rank == 1 and my_node_count == 1: - if os.path.isfile(args['config']): - shutil.copy(args['config'],log_dir) - - start_and_end_path = f"{log_dir}/start_and_end_times" - if local_rank == 1 and my_node_count == 1: - #start_and_end_path = f"{log_dir}/start_and_end_times" - if not os.path.exists(start_and_end_path): - os.mkdir(start_and_end_path) - - - for node_iter in nodes: - #TESTING MPI BARRIER - #replace this with if my rank + 1 <= node_iter - #if my_line_num <= node_iter: - #if rank <= node_iter - 1: - # Create a new communicator for the selected ranks - - for block_size in block_sizes: - for job_count in proc: - #print(f"This iteration's block size is: {block_size}") - new_comm = {} - #print(f"{hostname}: My node count = {my_node_count} and my local rank = {local_rank}. iteration node count = {node_iter} and iteration job count = {job_count}") - if my_node_count <= node_iter and local_rank <= job_count: - #new_comm[my_node_count] = comm.Split(color=my_node_count+1, key=rank) # Grouping ranks > some_count - iteration_comm = comm.Split(color=1,key=rank) - else: - #new_comm[my_node_count] = comm.Split(color=MPI.UNDEFINED, key=rank) # Exclude ranks <= some_count - iteration_comm = comm.Split(color=MPI.UNDEFINED, key=rank) - - - if iteration_comm != MPI.COMM_NULL: - - #just for organizational purposes, not related to actual mpi rank - global_rank = my_node_count * local_rank - - #print(f"{hostname}: local rank: {local_rank}, my node count: {my_node_count}, global rank: {global_rank}, total node count: {node_iter}, ranks per node: {job_count}, io type: {args['io_type']}") - - file_count = job_count - - #Reset file contents for FIO config file - file_contents = miscellaneous.reset_file_contents(original_file_contents, args, 1, block_size,log_dir,local_rank) - fio_job_config = f"{PyBench_root_dir}/examples/test_files/{job_number}_{hostname}_{local_rank}_{node_iter}n_{job_count}p_{file_count}f_{block_size}_{args['io_type']}.fio" - with open(fio_job_config, 'w') as file: - file.write(file_contents) - - fio_ob_name = f"{hostname}_{local_rank}_{node_iter}_{job_count}p_{file_count}f_{block_size}_{args['io_type']}" - fio_ob_dict[fio_ob_name] = handler_class.FIOTool() - - fio_ob_dict[fio_ob_name].setup_command(config_file=fio_job_config, output_format="json", output_file=f"{log_dir}/{hostname}_{local_rank}_{node_iter}_{job_count}p_{file_count}f_{block_size}.json") - - with open(f"{command_log_dir}/{job_number}_{hostname}_{local_rank}_{node_iter}_{job_count}p_{file_count}f_{block_size}_{args['platform_type']}_command", 'a') as file: - file.write(f"num nodes is {node_iter}, job number is {job_count}") - tmp_cmd_string = "" - for cmd_el in fio_ob_dict[fio_ob_name].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - - #network_counter_collection.stop_thread = False - #background_thread = threading.Thread(target=background_network_monitor, args=(args, job_count, node_iter, block_size, PyBench_root_dir)) - #background_thread.start() - start_time = time.time() - #TESTING MPI BARRIER - # Synchronize all processes at the barrier - #print(f"rank {local_rank} on node {my_node_count} is reaching the barrier.") - - # Once the barrier is passed, all processes continue - #current_time = time.time() - #print(f"rank {local_rank} on node {my_node_count} has passed the barrier. {time.time()}") - - # Continue with the rest of the code after the barrier - - iteration_comm.Barrier() # Wait for all processes to reach this point - start_time = time.time() - starting_statement = f"{datetime.datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] starting fio Job num: {job_count}, node count: {node_iter}, local rank {local_rank}, node count {my_node_count}, IO type {args['io_type']} {time.time()} \n" - fio_ob_dict[fio_ob_name].run() - end_time = time.time() - ending_statement = f"{datetime.datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] stopping fio Job num: {job_count}, node count: {node_iter}, local rank {local_rank}, node count {my_node_count}, IO type {args['io_type']} {time.time()} \n" - - ''' - if rank == my_node_count*1 - 1: - #print(f"rank {rank} from node count: {my_node_count} wants to receive.") - #combined_start_times = [] - combined_times = [] - combined_times.append((hostname, rank, start_time, end_time)) - receive_list = [] - previous_rank = 0 - - for i in range(job_count - 1): - if i == 0: - receive_list.append(rank + node_iter) - else: - receive_list.append(receive_list[i-1] + node_iter) - - #print(f"rank: {rank}, receive_list = {receive_list}") - for source_rank in receive_list: - rank_start_and_end = iteration_comm.recv(source=(source_rank), tag=0) - combined_times.append(rank_start_and_end) - #print(f"{rank} received {rank_start_and_end} from source rank {source_rank}") - #print(f"Rank: wants to receive data from source rank: {source_rank - 1}") - #combined_end_times.append([f"{local_rank}_end": end_time]) - - - with open(f"{start_and_end_path}/{job_number}_{hostname}_{node_iter}_{job_count}p_{file_count}f_{block_size}_{args['platform_type']}_times", 'a') as file: - json.dump(combined_times, file, indent=4) - ''' - if rank != 0: - combined_times = (hostname, rank, start_time, end_time) - iteration_comm.send(combined_times, dest=0, tag=0) - #iteration_comm.send((hostname, rank, start_time, end_time), dest=(my_node_count*1 - 1), tag=0) - #print(f"Rank: {rank} wants to send data to rank:{my_node_count*1 - 1}") - else: - start_end_times_list = [] - - for i in range(iteration_comm.Get_size()): - #print(i) - if i != 0: - start_end_times_list.append(iteration_comm.recv(source=i, tag=0)) - - iteration_comm.Barrier() - - #print(starting_statement) - #print(ending_statement) - - if rank == 0: - #log_and_analyze_data_points(log_dir, fio_ob_dict[fio_ob_name],start_end_times_list) - log_and_analyze_data_points(log_dir, fio_ob_dict[fio_ob_name]) - #network_counter_collection.stop_thread = True - #background_thread.join() - #end_time = time.time() - - #elapsed_time = end_time - start_time - #print(f"{datetime.datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Job num: {job_count}, node count: {node_iter}. Iteration is finished. {hostname} [s-{start_time}], [e-{end_time}, el-{elapsed_time}]") - - json_log_file = f"{log_dir}/{hostname}_{local_rank}_{node_iter}_{job_count}p_{file_count}f_{block_size}.json" - uncombined_json_log_file = f"{log_dir}/uncombined_{node_iter}_{job_count}p_{block_size}.tmp" - #first_barrier_file = f"{log_dir}/barrier_file_1_{iteration_count}.txt" - #second_barrier_file = f"{log_dir}/barrier_file_2_{iteration_count}.txt" - - if local_rank == 1: - if 'unit_restart' in args: - if args['unit_restart'] == 1: - pattern = '/' - split_dir = re.split(pattern, args['directory']) - cephtest_root = '/'+split_dir[1]+'/'+split_dir[2] - miscellaneous.restart_ceph_unit(cephtest_root) - print(f"restarting the daemon on {hostname} from rank {rank}") - - #wait_res = 0 - #while wait_res == 0: - if os.path.exists(json_log_file): - bw, iops = miscellaneous.load_json_results(json_log_file) - - with open(uncombined_json_log_file, 'a') as file: - fcntl.flock(file, fcntl.LOCK_EX) # Lock the file for exclusive access - file.write(f"{hostname}, bw: {bw}, iops: {iops}, local rank: {local_rank}\n") - fcntl.flock(file, fcntl.LOCK_UN) # Unlock the file after writing - else: - print(f"{datetime.datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] {local_rank} FIO JSON LOG FILE DOESN'T EXIST!!! - iteration {iteration_count}") - sys.exit() - - #This is wrong I think. Needs to change to a barrier outside of any iteration and before any log combination/modification. It still works because rank 0 is the only rank taking action after the iterations but... Either way it works... Just think about whether this is the spot to put it and about whether it's this communicator or the default communicator that should be used. - iteration_comm.Barrier() # Wait for all processes to reach this point - - #Probably remove this - #sys.stdout.flush() - #print("Sleeping for 15 seconds...") - time.sleep(5) - - - #probably change this so that only rank 0 executes anything - for node_iter in nodes: - for block_size in block_sizes: - for job_count in proc: - if local_rank == 1 and my_node_count == 1: - file_count = job_count - json_log_file = f"{log_dir}/{hostname}_{node_iter}_{job_count}p_{file_count}f_{block_size}.json" - combined_json_log_file = f"{log_dir}/combined_{node_iter}_{job_count}p_{file_count}f_{block_size}.json" - uncombined_json_log_file = f"{log_dir}/uncombined_{node_iter}_{job_count}p_{block_size}.tmp" - - bw_total = 0 - iops_total = 0 - - with open (uncombined_json_log_file, 'r') as file: - uncombined_dict = {} - for line in file: - parts = line.split(',') - host = parts[0] - bw = float(parts[1].split(':')[1].strip()) - iops = float(parts[2].split(':')[1].strip()) - - if host not in uncombined_dict: - uncombined_dict[host] = {'node_bw': 0, 'node_iops': 0} - uncombined_dict[host]['node_bw'] += bw - uncombined_dict[host]['node_iops'] += iops - - bw_total += bw - iops_total += iops - - data = { - "nodes": node_iter, - "processors": job_count, - "bw": bw_total, - "iops": iops_total - } - data['node_list'] = {} - for key, value in uncombined_dict.items(): - data['node_list'][key] = value - - with open(combined_json_log_file, 'w') as json_file: - json.dump(data, json_file, indent=4) - print(f"{datetime.datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Data successfully written to {combined_json_log_file}") - diff --git a/python_runs/mdtest_wrapper.py b/python_runs/mdtest_wrapper.py deleted file mode 100644 index 1954891..0000000 --- a/python_runs/mdtest_wrapper.py +++ /dev/null @@ -1,113 +0,0 @@ -import os -import socket -import handler_class -from datetime import datetime -import sys -import benchmark_tools -import args_handler -import miscellaneous -import network_collect -import threading -import time -import re -import shutil - -def wrap_mdtest(args, PyBench_root_dir): - - job_number = args['slurm_job_number'] - - current_dir = os.getcwd() - - mdtest_obj_dict = {} - #handler_class.mdtestTool() - - log_dir = f"{PyBench_root_dir}/results/{args['io_type']}/{args['platform_type']}/{job_number}" - command_log_dir = f"{log_dir}/commands" - tmp_log_dir = f"{log_dir}/tmp_files" - - hostname = socket.gethostname() - - mpi_ranks = sorted(benchmark_tools.split_arg_sequence(args['mpi_ranks'], "--mpi-ranks")) - files_per_rank_list = sorted(benchmark_tools.split_arg_sequence(args['files_per_rank'], "--files-per-rank")) - test_repetition = args['test_repetition'] - directory = args['directory'] - offset = args['offset'] - write_data = args['write_data'] - read_data = args['read_data'] - node_count = sorted(benchmark_tools.split_arg_sequence(args['node_count'], "--node-count")) - - total_files_optimized = 0 - - if args['timed']: - times = sorted(benchmark_tools.split_arg_sequence(args['timed'], "--timed")) - if len(times) != 2: - print ( f"When using the 'timed' option, please ensure to specify two comma-delimited integer values indicating a lower threshold and upper threshold of time in seconds that the test should run for. Values as interpreted are: {times}" ) - sys.exit() - lower_time_threshold = times[0] - upper_time_threshold = times[1] - - for node in node_count: - for rank in mpi_ranks: - tmp_rank = int(rank) - node_type = int(node) - tmp_rank = node_type * tmp_rank - ranks_per_node = int(tmp_rank / node_type) - for files_per_rank in files_per_rank_list: - if total_files_optimized != 0: - files_per_rank = total_files_optimized / tmp_rank - - out_file = f"{log_dir}/mdtest_output_{node_type}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank" - - print (f"ranks per node are {ranks_per_node} and type is {type(ranks_per_node)}, nodes are {node_type} and type is {type(node_type)}") - mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"] = handler_class.mdtestTool() - - mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].setup_command(config_file=f"{PyBench_root_dir}/{args['config']}", mpi_ranks=f"{tmp_rank}", files_per_rank=f"{files_per_rank}", test_repetition=f"{test_repetition}", directory=f"{directory}", offset=f"{offset}", output_file=out_file, write_data=f"{write_data}", read_data=f"{read_data}", ranks_per_node=f"{ranks_per_node}", write_output=1) - - with open(f"{command_log_dir}/mdtest_command_{node_type}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank", 'a') as file: - file.write(f"The following is the mdtest command") - tmp_cmd_string = "" - for cmd_el in mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].run() - - start_time, end_time, elapsed_time = benchmark_tools.mdtest_start_end_elapsed_time(out_file) - - if args['timed']: - #elapsed_time, out_file, tmp_log_dir, tmp_log_filename, lower_threshold, higher_threshold, log_dir, args, among others - if elapsed_time <= lower_time_threshold or elapsed_time >= upper_time_threshold: - while elapsed_time <= lower_time_threshold or elapsed_time >= upper_time_threshold: - source = out_file - tmp_log_filename = re.split('/', out_file)[len(re.split('/', out_file)) - 1] - destination = f"{tmp_log_dir}/{tmp_log_filename}" - shutil.move(source, destination) - - if elapsed_time <= lower_time_threshold: - multiple = lower_time_threshold / elapsed_time - new_files_per_rank = int(files_per_rank * multiple) + int(0.05 * files_per_rank) - if elapsed_time >= upper_time_threshold: - multiple = upper_time_threshold / elapsed_time - new_files_per_rank = int(files_per_rank * multiple) - int(0.05 * files_per_rank) - - out_file = f"{log_dir}/mdtest_output_{node_type}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank_timed" - - mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"] = handler_class.mdtestTool() - mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"].setup_command(config_file=f"{PyBench_root_dir}/{args['config']}", mpi_ranks=f"{tmp_rank}", files_per_rank=f"{new_files_per_rank}", test_repetition=f"{test_repetition}", directory=f"{directory}", offset=f"{offset}", output_file=out_file, write_data=f"{write_data}", read_data=f"{read_data}", ranks_per_node=f"{ranks_per_node}", write_output=1) - with open(f"{command_log_dir}/mdtest_command_{node_type}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank_timed", 'a') as file: - file.write(f"The following is the mdtest command") - tmp_cmd_string = "" - for cmd_el in mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - mdtest_obj_dict[f"{node_type}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"].run() - old_elapsed_time = elapsed_time - start_time, end_time, elapsed_time = benchmark_tools.mdtest_start_end_elapsed_time(out_file) - - print (f"entered the optimizer. Old elapsed time: {old_elapsed_time}, New elapsed time: {elapsed_time}, old files_per_rank {files_per_rank}, new files per rank {new_files_per_rank}, multiple is: {multiple}") - files_per_rank = new_files_per_rank - total_files_optimized = files_per_rank * tmp_rank - print(f"mdtest job {node_type}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank is finished. [s-{start_time}], [e-{end_time}], elapsed time: {elapsed_time}") - - sys.stdout.flush() diff --git a/python_runs/mpi_run.py b/python_runs/mpi_run.py deleted file mode 100644 index a7addc9..0000000 --- a/python_runs/mpi_run.py +++ /dev/null @@ -1,43 +0,0 @@ -import os -from mpi4py import MPI -import sys -import benchmark_tools -import args_handler -import miscellaneous -from independent_runs import serverless_fio -from multi_node import server_fio -from mdtest_wrapper import wrap_mdtest -from IOR_wrapper import wrap_IOR -from testIOR_wrapper import test_wrap_IOR -from test_mdtest_wrapper import test_wrap_mdtest -import socket - -comm = MPI.COMM_WORLD -rank = comm.Get_rank() -size = comm.Get_size() - -hostname = socket.gethostname() -print(f"Rank {rank} of {size} running on {hostname}") -var_name = "PyBench_root_dir" - -try: - PyBench_root_dir = os.environ[var_name] - print(f"{var_name} = {PyBench_root_dir}") -except KeyError: - print(f"{var_name} is not set, please set the root directory before running this script.") - sys.exit(1) - -args = args_handler.handle_arguments() - -if args['benchmark'] == 'fio-server': - server_fio(args, PyBench_root_dir) -elif args['benchmark'] == 'fio-serverless': - serverless_fio(args, PyBench_root_dir) -elif args['benchmark'] == 'mdtest': - wrap_mdtest(args, PyBench_root_dir) -elif args['benchmark'] == 'testmdtest': - test_wrap_mdtest(args, PyBench_root_dir) -elif args['benchmark'] == 'newIORTool': - wrap_IOR(args, PyBench_root_dir) -elif args['benchmark'] =='testIORTool': - test_wrap_IOR(args, PyBench_root_dir) diff --git a/python_runs/multi_node.py b/python_runs/multi_node.py deleted file mode 100644 index 3a22b2f..0000000 --- a/python_runs/multi_node.py +++ /dev/null @@ -1,93 +0,0 @@ -import os -import handler_class -from datetime import datetime -import sys -import benchmark_tools -import args_handler -import miscellaneous - -''' -var_name = "PyBench_root_dir" - -try: - PyBench_root_dir = os.environ[var_name] - print(f"{var_name} = {PyBench_root_dir}") -except KeyError: - print(f"{var_name} is not set, please set the root directory before running this script.") - sys.exit(1) -''' - -#args = args_handler.handle_arguments() - -def server_fio(args, PyBench_root_dir): - job_number = args['slurm_job_number'] - - # Count the number of lines in the job hosts file - line_count = benchmark_tools.count_lines(args['hosts_file']) - - current_dir = os.getcwd() - - fio_ob_dict = {} - fio_out_dict = {} - - proc = benchmark_tools.split_arg_sequence(args['job_number'], '--job-number') - nodes = benchmark_tools.split_arg_sequence(str(args['node_count']), '--node-count') - files = nodes.copy() - - set_noscrub = args['no_scrub'] - - if args['split_hosts_file']: - benchmark_tools.create_node_list_file(args['node_count'], args['hosts_file'], PyBench_root_dir, job_number) - - config_template_path = args['template_path'] - - with open(config_template_path, 'r') as file: - original_file_contents = file.read() - - fio_scrub = handler_class.FIOTool() - - if set_noscrub == 1: - fio_scrub.set_noscrub() - - log_dir = f"{PyBench_root_dir}/results/{args['io_type']}/{args['platform_type']}/{job_number}" - command_log_dir = f"{log_dir}/commands" - - miscellaneous.ensure_log_directory_exists(log_dir,1) - miscellaneous.ensure_log_directory_exists(command_log_dir,1) - - for node_count in nodes: - - for job_count in proc: - - file_count = job_count - - # Reset file_contents to the original template for each iteration - file_contents = original_file_contents - file_contents = file_contents.replace("__block_size__", args['block_size']) - file_contents = file_contents.replace("__number_of_jobs__", f"{job_count}") - file_contents = file_contents.replace("__dir_var__", args['directory']) - file_contents = file_contents.replace("__io_type_var__", args['io_type']) - file_contents = file_contents.replace("__time_var__",f"{args['time']}") - - with open(f"examples/test_files/multinode_{job_count}p_{file_count}f_{args['block_size']}_{args['io_type']}.fio", 'w') as file: - file.write(file_contents) - - fio_ob_dict[f"{node_count}n_{job_count}p_{file_count}f_{args['io_type']}"] = handler_class.FIOTool() - - fio_ob_dict[f"{node_count}n_{job_count}p_{file_count}f_{args['io_type']}"].setup_command(config_file=f"{PyBench_root_dir}/examples/test_files/multinode_{job_count}p_{file_count}f_{args['block_size']}_{args['io_type']}.fio", output_format="json", output_file=f"{log_dir}/{node_count}n_{job_count}p_{file_count}f_{args['block_size']}.json", host_file=f"{PyBench_root_dir}/host_files/{job_number}_{node_count}_hosts.file") - - with open(f"{command_log_dir}/{job_number}_{node_count}n_{job_count}p_{file_count}f_{args['platform_type']}_command", 'a') as file: - file.write(f"num nodes is {node_count}, job number is {job_count}") - tmp_cmd_string = "" - for cmd_el in fio_ob_dict[f"{node_count}n_{job_count}p_{file_count}f_{args['io_type']}"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - fio_ob_dict[f"{node_count}n_{job_count}p_{file_count}f_{args['io_type']}"].run() - - print(f"Job num: {job_count}, node count: {node_count}. Iteration is finished.") - sys.stdout.flush() - - if set_noscrub == 1: - fio_scrub.set_scrub() - diff --git a/python_runs/preallocate_all_files.py b/python_runs/preallocate_all_files.py deleted file mode 100644 index d089458..0000000 --- a/python_runs/preallocate_all_files.py +++ /dev/null @@ -1,93 +0,0 @@ -import os -import handler_class -from datetime import datetime -import sys -import benchmark_tools -import args_handler -import miscellaneous - -var_name = "PyBench_root_dir" - -try: - PyBench_root_dir = os.environ[var_name] - print(f"{var_name} = {PyBench_root_dir}") -except KeyError: - print(f"{var_name} is not set, please set the root directory before running this script.") - sys.exit(1) - -args = args_handler.handle_arguments() - -job_number = args['slurm_job_number'] - -# Count the number of lines in the job hosts file -line_count = benchmark_tools.count_lines(args['hosts_file']) - -current_dir = os.getcwd() - -fio_ob_dict = {} -fio_out_dict = {} - -proc = benchmark_tools.split_arg_sequence(args['job_number'], '--job-number') -nodes = benchmark_tools.split_arg_sequence(str(args['node_count']), '--node-count') -files = nodes.copy() - -set_noscrub = args['no_scrub'] - -if args['split_hosts_file']: - benchmark_tools.create_node_list_file(args['node_count'], args['hosts_file'], PyBench_root_dir, job_number) - -config_template_path = args['template_path'] - -with open(config_template_path, 'r') as file: - original_file_contents = file.read() - -fio_scrub = handler_class.FIOTool() - -if set_noscrub == 1: - fio_scrub.set_noscrub() - -log_dir = f"{PyBench_root_dir}/results/{args['io_type']}/{args['platform_type']}/{job_number}" -command_log_dir = f"{log_dir}/commands" - -miscellaneous.ensure_log_directory_exists(log_dir,1) -miscellaneous.ensure_log_directory_exists(command_log_dir,1) - -for node_count in nodes: - - - for job_count in proc: - - file_count = job_count - - # Reset file_contents to the original template for each iteration - file_contents = original_file_contents - file_contents = file_contents.replace("__block_size__", args['block_size']) - file_contents = file_contents.replace("__number_of_jobs__", f"{job_count}") - file_contents = file_contents.replace("__dir_var__", args['directory']) - file_contents = file_contents.replace("__io_type_var__", args['io_type']) - file_contents = file_contents.replace("__time_var__",f"{args['time']}") - - with open(f"examples/test_files/multinode_{job_count}p_{file_count}f_{args['block_size']}_{args['io_type']}.fio", 'w') as file: - file.write(file_contents) - - fio_ob_dict[f"{node_count}n_{job_count}p_{file_count}f_{args['io_type']}"] = handler_class.FIOTool() - - fio_ob_dict[f"{node_count}n_{job_count}p_{file_count}f_{args['io_type']}"].setup_command(config_file=f"{PyBench_root_dir}/examples/test_files/multinode_{job_count}p_{file_count}f_{args['block_size']}_{args['io_type']}.fio", output_format="json", output_file=f"{log_dir}/{node_count}n_{job_count}p_{file_count}f_{args['block_size']}.json", host_file=f"{PyBench_root_dir}/host_files/{job_number}_{node_count}_hosts.file") - - fio_ob_dict[f"{node_count}n_{job_count}p_{file_count}f_{args['io_type']}"].run() - - log_file_path = f"{log_dir}/{node_count}n_{job_count}p_{file_count}f_{args['block_size']}.json" - - ''' - if os.path.exists(log_file_path): - os.remove(log_file_path) - print(f"File {log_file_path} has been removed.") - else: - print(f"File {log_file_path} does not exist.") - ''' - - print(f"Job num: {job_count}, node count: {node_count}. Iteration is finished.") - -if set_noscrub == 1: - fio_scrub.set_scrub() - diff --git a/python_runs/prep_work.py b/python_runs/prep_work.py deleted file mode 100644 index a86897d..0000000 --- a/python_runs/prep_work.py +++ /dev/null @@ -1,45 +0,0 @@ -import os -import sys -import miscellaneous -from args_handler import handle_arguments - -def prep_work(args, PyBench_root_dir): - job_number = args['slurm_job_number'] - - #log_dir = f"{PyBench_root_dir}/results/{args['io_type']}/{args['platform_type']}/{job_number}" - if args['benchmark'] == "newIORTool" or args['benchmark'] == "testIORTool": - log_dir = f"{PyBench_root_dir}/results/iortest/{args['io_type']}/{args['platform_type']}/{job_number}" - if args['benchmark'] == "testmdtest": - log_dir = f"{PyBench_root_dir}/results/{args['not_taken_into_account']['io_type']}/{args['not_taken_into_account']['platform_type']}/{job_number}" - else: - log_dir = f"{PyBench_root_dir}/results/{args['io_type']}/{args['platform_type']}/{job_number}" - - command_log_dir = f"{log_dir}/commands" - network_log_dir = f"{PyBench_root_dir}/network_stats/{job_number}" - test_files_log = f"{PyBench_root_dir}/examples/test_files" - tmp_log_dir = f"{log_dir}/tmp_files" - - miscellaneous.ensure_log_directory_exists(log_dir,1) - miscellaneous.ensure_log_directory_exists(command_log_dir,1) - miscellaneous.ensure_log_directory_exists(network_log_dir, 1) - miscellaneous.ensure_log_directory_exists(test_files_log, 1) - miscellaneous.ensure_log_directory_exists(tmp_log_dir, 1) - - try: - with open(f"{log_dir}/hostname_mapping.txt", "x") as file: - #file.write("Hostname_mapping file was created because it did not exist.\n") - pass - except FileExistsError: - print("File already exists.") - -var_name = "PyBench_root_dir" - -try: - PyBench_root_dir = os.environ[var_name] - #print(f"{var_name} = {PyBench_root_dir}") -except KeyError: - print(f"{var_name} is not set, please set the root directory before running this script.") - sys.exit(1) - -args = handle_arguments() -prep_work(args, PyBench_root_dir) diff --git a/python_runs/run.py b/python_runs/run.py deleted file mode 100644 index 8abdbe5..0000000 --- a/python_runs/run.py +++ /dev/null @@ -1,38 +0,0 @@ -import os -import sys -import benchmark_tools -import args_handler -import miscellaneous -from independent_runs import serverless_fio -from independent_runs_independent_ranks import independent_ranks -from multi_node import server_fio -from mdtest_wrapper import wrap_mdtest -from IOR_wrapper import wrap_IOR -from testIOR_wrapper import test_wrap_IOR -from test_mdtest_wrapper import test_wrap_mdtest - -var_name = "PyBench_root_dir" - -try: - PyBench_root_dir = os.environ[var_name] - #print(f"{var_name} = {PyBench_root_dir}") -except KeyError: - print(f"{var_name} is not set, please set the root directory before running this script.") - sys.exit(1) - -args = args_handler.handle_arguments() - -if args['benchmark'] == 'fio-server': - server_fio(args, PyBench_root_dir) -elif args['benchmark'] == 'fio-serverless': - serverless_fio(args, PyBench_root_dir) -elif args['benchmark'] == 'fio-independent-ranks': - independent_ranks(args, PyBench_root_dir) -elif args['benchmark'] == 'mdtest': - wrap_mdtest(args, PyBench_root_dir) -elif args['benchmark'] == 'testmdtest': - test_wrap_mdtest(args, PyBench_root_dir) -elif args['benchmark'] == 'newIORTool': - wrap_IOR(args, PyBench_root_dir) -elif args['benchmark'] =='testIORTool': - test_wrap_IOR(args, PyBench_root_dir) diff --git a/python_runs/testIOR_wrapper.py b/python_runs/testIOR_wrapper.py deleted file mode 100644 index fa0717d..0000000 --- a/python_runs/testIOR_wrapper.py +++ /dev/null @@ -1,136 +0,0 @@ -import os -import socket -import handler_class -from datetime import datetime -import json -import sys -import yaml -import benchmark_tools -import args_handler -import miscellaneous -import network_collect -import threading -import time -import re -import shutil - -def test_wrap_IOR(args, PyBench_root_dir): - - job_number = args['slurm_job_number'] - - current_dir = os.getcwd() - - mdtest_obj_dict = {} - #handler_class.mdtestTool() - - log_dir = f"{PyBench_root_dir}/results/iortest/{args['io_type']}/{args['platform_type']}/{job_number}" - command_log_dir = f"{log_dir}/commands" - tmp_log_dir = f"{log_dir}/tmp_files" - - hostname = socket.gethostname() - - required_args = ['config','mpi_ranks', 'node_count', 'blockSize', 'filename', 'transferSize', 'segmentCount'] - for i in required_args: - if i not in args: - arg_string = i.replace('_','-') - print(f"Missing option --{arg_string}. Please look at IOR and/or MPI documentation and fix this error.") - sys.exit() - elif not args[i]: - arg_string = i.replace('_','-') - print(f"Incorrect --{arg_string} usage. Please look at IOR and/or MPI documentation and fix this error.") - sys.exit() - - print("Required arguments seem valid...") - - config_file = args['config'] - print(config_file) - - if config_file: - try: - with open (config_file, 'r') as opts_file: - config = yaml.safe_load(opts_file) - except yaml.YAMLError as e: - print(f"Error loading YAML file: {e}") - except FileNotFoundError as e: - print(f"File not found: {e}") - except Exception as e: - print(f"An unexpected error has occurred: {e}") - else: - raise ValueError("Configuration file must be specified. IOR...") - - for key,value in config.items(): - if key != "config_options" and key != "command_extensions": - print(f"{key}: {value}") - if key == "config_options": - print("Configuration options:") - for key,value in config["config_options"].items(): - print (f"{key}: {value}") - if key == "command_extensions": - print("Command extensions:") - for i in config["command_extensions"]: - print(f"{i}") - - if 'job_note' in args.keys(): - with open(f"{log_dir}/job_note.txt", 'w') as file: - file.write(args['job_note']) - - mpi_ranks = list(benchmark_tools.split_arg_sequence(args['mpi_ranks'], "--mpi-ranks")) - filename = args['filename'] - node_count = list(benchmark_tools.split_arg_sequence(args['node_count'], "--node-count")) - block_size = args['blockSize'] - transfer_size = args['transferSize'] - segment_count = args['segmentCount'] - ior_obj_dict = {} - - print (f"SEQUENCES ARE {mpi_ranks} and nodes {node_count}") - for nodes in node_count: - for ranks in mpi_ranks: - print (f"BEGINNING OF LOOPS ---------------------- ranks per node: {ranks} and nodes: {nodes}") - - if 'unit_restart' in args: - if args['unit_restart'] == 1: - pattern = '/' - split_dir = re.split(pattern, filename) - cephtest_root = '/'+split_dir[1]+'/'+split_dir[2] - miscellaneous.restart_ceph_unit(cephtest_root) - - total_ranks = ranks * nodes - print(f"TOTAL RANKS ARE {total_ranks}") - output_file = f"{log_dir}/ranks_per_node_{ranks}_node_count_{nodes}" - - ior_obj_dict[f"{ranks}_{nodes}"] = handler_class.test_ior_tool() - ior_obj_dict[f"{ranks}_{nodes}"].setup_command(config=config, mpi_ranks=total_ranks,ranks_per_node=ranks,output_file=output_file) - - with open(f"{command_log_dir}/command_ior_{ranks}_{nodes}", 'w') as file: - tmp_cmd_string = "" - for cmd_el in ior_obj_dict[f"{ranks}_{nodes}"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - ior_obj_dict[f"{ranks}_{nodes}"].run() - - ''' - for nodes in node_count: - for ranks in mpi_ranks: - #Output handling - output_file = f"{log_dir}/ranks_per_node_{ranks}_node_count_{nodes}" - combined_json_log_file = f"{log_dir}/combined_{ranks}_{nodes}_{block_size}" - json_log_file = output_file - - if os.path.exists(json_log_file): - bw, iops = miscellaneous.load_ior_json_results(json_log_file, log_dir) - else: - print( f"JSON LOG FILE NOT FOUND! ------- {output_file}" ) - sys.exit() - - data = { - "nodes": nodes, - "processors": ranks, - "bw": bw, - "iops": iops - } - - with open(combined_json_log_file, 'w') as json_file: - json.dump(data, json_file, indent=4) - print(f"Data successfully written to {combined_json_log_file}") - ''' diff --git a/python_runs/test_mdtest_wrapper.py b/python_runs/test_mdtest_wrapper.py deleted file mode 100644 index ecbaf72..0000000 --- a/python_runs/test_mdtest_wrapper.py +++ /dev/null @@ -1,298 +0,0 @@ -import os -import socket -import handler_class -from datetime import datetime -import sys -import benchmark_tools -import args_handler -import miscellaneous -import network_collect -import threading -import time -import re -import shutil -import subprocess -from datetime import datetime - -def test_wrap_mdtest(args, PyBench_root_dir): - - job_number = args['slurm_job_number'] - - current_dir = os.getcwd() - - mdtest_obj_dict = {} - #handler_class.mdtestTool() - - log_dir = f"{PyBench_root_dir}/results/{args['not_taken_into_account']['io_type']}/{args['not_taken_into_account']['platform_type']}/{job_number}" - command_log_dir = f"{log_dir}/commands" - tmp_log_dir = f"{log_dir}/tmp_files" - - hostname = socket.gethostname() - - mpi_ranks = sorted(benchmark_tools.split_arg_sequence(args['general_opts']['mpi_ranks'], "--mpi-ranks")) - files_per_rank_list = sorted(benchmark_tools.split_arg_sequence(args['mdtest_opts']['files_per_rank'], "--files-per-rank")) - #test_repetition = args['test_repetition'] - directory = args['mdtest_opts']['directory'] - #offset = args['offset'] - #write_data = args['write_data'] - #read_data = args['read_data'] - node_count = sorted(benchmark_tools.split_arg_sequence(args['general_opts']['node_count'], "--node-count")) - - - #config_file = args['config'] - #config = miscellaneous.get_config_params(config_file) - - total_files_optimized = 0 - if args['not_taken_into_account']['timed']: - times = sorted(benchmark_tools.split_arg_sequence(args['not_taken_into_account']['timed'], "--timed")) - if len(times) != 2: - print ( f"{datetime.now().strftime('%b %d %H:%M:%S')} When using the 'timed' option, please ensure to specify two comma-delimited integer values indicating a lower threshold and upper threshold of time in seconds that the test should run for. Values as interpreted are: {times}" ) - sys.exit() - lower_time_threshold = times[0] - upper_time_threshold = times[1] - - - if 'job_note' in args['not_taken_into_account'].keys(): - job_note = f"{args['not_taken_into_account']['job_note']}" - with open(f"{log_dir}/job_note.txt", 'w') as file: - file.write(job_note) - else: - print("{datetime.now().strftime('%b %d %H:%M:%S')} Job note required for job tracking. Please include an argument under the \"not_taken_into_account\" YAML dict") - sys.exit() - - #If the "--in-parts" argument is used, we will break out into a separate a logical branch. This new branch may become the main branch and the old logic may take the place of the current logic. Whatever parts of the old code that can be recycled, should be? - if "not_taken_into_account" in args.keys(): - print("in not_taken_into_account") - if "in_parts" in args["not_taken_into_account"].keys(): - if args["not_taken_into_account"]["in_parts"]: - print("in in_parts") - #for each iteratable element in general_opts and mdtest_opts I want a counter, a way to iterate over it (so maybe a key), and the sub-elements. EXCEPT command-extensions: - result_dict = {} - combined_opts = {**args["general_opts"],**args["mdtest_opts"]} - - for key, value in combined_opts.items(): - if isinstance(value, str) and ',' in value: # Handle comma-separated strings - value_list = list(value.split(',')) - else: # Handle single values or non-comma-separated strings - value_list = [value] - if key == "command_extensions": - pass - - result_dict[key] = {key:value_list, 'counter': len(value_list) - 1, 'tmp_counter': len(value_list) - 1} - - entered_loop = 0 - for node in result_dict['node_count']['node_count']: - for rank in result_dict['mpi_ranks']['mpi_ranks']: - tmp_rank = int(rank) - tmp_node = int(node) - tmp_rank = tmp_node * tmp_rank - ranks_per_node = int(tmp_rank / tmp_node) - - universal_key_counter = 1 - for key, value in result_dict.items(): - value['tmp_counter'] = value['counter'] - - files_per_rank = int(int(result_dict['files_per_rank']['files_per_rank'][result_dict['files_per_rank']['tmp_counter']]) / int(tmp_rank)) - entered_loop = 1 - while universal_key_counter != 0: - universal_key_counter = 0 - tmp_result_dict = {} - for command_extent_element in reversed(result_dict['command_extensions']['command_extensions']): - if 'unit_restart' in args['not_taken_into_account'].keys(): - if args['not_taken_into_account']['unit_restart'] == 1: - pattern = '/' - split_dir = re.split(pattern, directory) - FS_root = '/'+split_dir[1]+'/'+split_dir[2] - miscellaneous.restart_ceph_unit(FS_root) - - #Set temporary values for the current loop and the output file - for key2,value2 in result_dict.items(): - if key2 == "command_extensions": - tmp_result_dict["command_extensions"] = command_extent_element - else: - tmp_result_dict[f"{key2}"] = value2[f"{key2}"][value2['tmp_counter']] - - config = {**tmp_result_dict,**args["not_taken_into_account"]} - - out_file = f"{log_dir}/mdtest_output_{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank" - #Print statistics - print (f"{datetime.now().strftime('%b %d %H:%M:%S')} ranks per node are {ranks_per_node}, nodes are {tmp_node}, and mdtest job type is {command_extent_element}") - #--------- - - #Create mdtest object - mdtest_obj_dict[f"{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"] = handler_class.mdtestTool() - mdtest_obj_dict[f"{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].setup_command(config=config, config_file=f"{PyBench_root_dir}/{args['config']}", mpi_ranks=f"{tmp_rank}", files_per_rank=f"{files_per_rank}", directory=f"{directory}", output_file=out_file, ranks_per_node=f"{ranks_per_node}", write_output=args['mdtest_opts']['write_output']) - - #Write command into command output file - with open(f"{command_log_dir}/mdtest_{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank", 'a') as file: - file.write(f"The following is the mdtest command") - tmp_cmd_string = "" - for cmd_el in mdtest_obj_dict[f"{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - #--------- - - #Run mdtest through object and enter optimizer as necessary - mdtest_obj_dict[f"{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].run() - print(f"Does this object exist?, ", mdtest_obj_dict[f"{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].elapsed_time) - - if int(mdtest_obj_dict[f"{command_extent_element}_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].elapsed_time) <= 10: - print("Warning!!! This run took less than 10 seconds!, not sleeping for now...") - #time.sleep(10) - - #create/delete snapshots SECTION - - if command_extent_element == "YuC": - if not os.path.exists(f"{directory}/.snap/test_snapshots"): - result = subprocess.run(f"mkdir {directory}/.snap/test_snapshots", shell=True, capture_output=False, text=True, check=True) - print("{datetime.now().strftime('%b %d %H:%M:%S')} Creating snapshot after running creation mdtest...") - else: - print("{datetime.now().strftime('%b %d %H:%M:%S')} snapshot already exists, something went wrong") - - if command_extent_element == "Yur": - if os.path.exists(f"{directory}/.snap/test_snapshots"): - print("{datetime.now().strftime('%b %d %H:%M:%S')} Deleting snapshot after running deletion mdtest...") - result = subprocess.run(f"rmdir {directory}/.snap/test_snapshots", shell=True, capture_output=False, text=True, check=True) - else: - print("{datetime.now().strftime('%b %d %H:%M:%S')} snapshot doesn't exist, something went wrong.") - - #-------- snapshots SECTION ends here - - # Iterate through arguments that have still not been used, decrement temporary counters - for key, value in result_dict.items(): - - if key != 'mpi_ranks' and key != 'node_count' and key != 'write_output' and key != "command_extensions": - if value['tmp_counter'] == 0: - pass - else: - value['tmp_counter'] = value['tmp_counter'] - 1 - universal_key_counter = 1 - #---------- - else: - print("not in_parts") - sys.exit() - ''' - for node in result_dict['node_count']['node_count']: - for rank in result_dict['mpi_ranks']['mpi_ranks']: - universal_key_counter = 1 - for key, value in result_dict.items(): - value['tmp_counter'] = value['counter'] - while universal_key_counter != 0: - universal_key_counter = 0 - final_command = f"mpirun -n {rank} --map-by node -N {node} --verbose mdtest" - for key, value in result_dict.items(): - if key != 'mpi_ranks' and key != 'node_count' and key != 'write_output': - if value['tmp_counter'] == 0: - if key in key_map: - final_command += f" -{key_map[key]} {str(value[key][0])}" - else: - final_command += f" -{key} {str(value[key][0])}" - else: - if key in key_map: - final_command += f" -{key_map[key]} {str(value[key][value['tmp_counter']])}" - else: - final_command += f" -{key} {str(value[key][value['tmp_counter']])}" - value['tmp_counter'] = value['tmp_counter'] - 1 - universal_key_counter = 1 - #if key == 'command_extensions' - # final_command += f" -{str(value[key][value['counter']])}" - print(final_command) - ''' - else: - print("General ops which are not yet taken into account are required.") - sys.exit() - sys.exit() - for node in node_count: - for rank in mpi_ranks: - tmp_rank = int(rank) - tmp_node = int(node) - tmp_rank = tmp_node * tmp_rank - ranks_per_node = int(tmp_rank / tmp_node) - for files_per_rank in files_per_rank_list: - - if 'unit_restart' in args: - if args['unit_restart'] == 1: - pattern = '/' - split_dir = re.split(pattern, directory) - FS_root = '/'+split_dir[1]+'/'+split_dir[2] - miscellaneous.restart_ceph_unit(FS_root) - - if total_files_optimized != 0: - files_per_rank = total_files_optimized / tmp_rank - - if files_per_rank < 10: - files_per_rank = 10 - #elif files_per_rank > 10000: - # files_per_rank = 10000 - out_file = f"{log_dir}/mdtest_output_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank" - - print (f"ranks per node are {ranks_per_node} and type is {type(ranks_per_node)}, nodes are {tmp_node} and type is {type(tmp_node)}") - mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"] = handler_class.mdtestTool() - - mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].setup_command(config=config, config_file=f"{PyBench_root_dir}/{args['config']}", mpi_ranks=f"{tmp_rank}", files_per_rank=f"{files_per_rank}", directory=f"{directory}", output_file=out_file, ranks_per_node=f"{ranks_per_node}", write_output=args['write_output']) - #mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].setup_command(config_file=f"{PyBench_root_dir}/{args['config']}", mpi_ranks=f"{tmp_rank}", files_per_rank=f"{files_per_rank}", test_repetition=f"{test_repetition}", directory=f"{directory}", offset=f"{offset}", output_file=out_file, write_data=f"{write_data}", read_data=f"{read_data}", ranks_per_node=f"{ranks_per_node}") - - with open(f"{command_log_dir}/mdtest_command_{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank", 'a') as file: - file.write(f"The following is the mdtest command") - tmp_cmd_string = "" - for cmd_el in mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank"].run() - - start_time, end_time, elapsed_time = benchmark_tools.mdtest_start_end_elapsed_time(out_file) - - if args['timed']: - #elapsed_time, out_file, tmp_log_dir, tmp_log_filename, lower_threshold, higher_threshold, log_dir, args, among others - if elapsed_time <= lower_time_threshold or elapsed_time >= upper_time_threshold: - while elapsed_time <= lower_time_threshold or elapsed_time >= upper_time_threshold: - if 'unit_restart' in args: - if args['unit_restart'] == 1: - pattern = '/' - split_dir = re.split(pattern, directory) - FS_root = '/'+split_dir[1]+'/'+split_dir[2] - miscellaneous.restart_ceph_unit(FS_root) - - source = out_file - tmp_log_filename = re.split('/', out_file)[len(re.split('/', out_file)) - 1] - destination = f"{tmp_log_dir}/{tmp_log_filename}" - shutil.move(source, destination) - - if elapsed_time <= lower_time_threshold: - multiple = lower_time_threshold / elapsed_time - new_files_per_rank = int(files_per_rank * multiple) + int(0.05 * files_per_rank) - if elapsed_time >= upper_time_threshold: - multiple = upper_time_threshold / elapsed_time - new_files_per_rank = int(files_per_rank * multiple) - int(0.05 * files_per_rank) - - if new_files_per_rank < 10: - new_files_per_rank = 10 - #elif new_files_per_rank > 10000: - # new_file_per_rank = 10000 - - out_file = f"{log_dir}/mdtest_output_{tmp_node}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank_timed" - - mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"] = handler_class.mdtestTool() - mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"].setup_command(config=config, config_file=f"{PyBench_root_dir}/{args['config']}", mpi_ranks=f"{tmp_rank}", files_per_rank=f"{new_files_per_rank}", directory=f"{directory}", output_file=out_file, ranks_per_node=f"{ranks_per_node}", write_output=args['write_output']) - with open(f"{command_log_dir}/mdtest_command_{tmp_node}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank_timed", 'a') as file: - file.write(f"The following is the mdtest command") - tmp_cmd_string = "" - for cmd_el in mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"].command: - tmp_cmd_string += f" {cmd_el}" - file.write(tmp_cmd_string) - - mdtest_obj_dict[f"{tmp_node}_nodes_{ranks_per_node}_ranks_{new_files_per_rank}_new_files_per_rank"].run() - old_elapsed_time = elapsed_time - start_time, end_time, elapsed_time = benchmark_tools.mdtest_start_end_elapsed_time(out_file) - - print (f"entered the optimizer. Old elapsed time: {old_elapsed_time}, New elapsed time: {elapsed_time}, old files_per_rank {files_per_rank}, new files per rank {new_files_per_rank}, multiple is: {multiple}") - files_per_rank = new_files_per_rank - if files_per_rank < 10: - files_per_rank = 10 - #elif files_per_rank > 10000: - # files_per_rank = 10000 - total_files_optimized = files_per_rank * tmp_rank - print(f"mdtest job {tmp_node}_nodes_{ranks_per_node}_ranks_{files_per_rank}_files_per_rank is finished. [s-{start_time}], [e-{end_time}], elapsed time: {elapsed_time}") - - sys.stdout.flush() diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 276a9df..0000000 --- a/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -cryptography==42.0.8 -matplotlib==3.9.2 -pandas==2.2.2 -paramiko==3.4.0 -psutil==6.0.0 -PyYAML diff --git a/python_runs/IOR_config/example_read.yml b/src/BenchmarkToolkit/IOR_config/example_read.yml similarity index 100% rename from python_runs/IOR_config/example_read.yml rename to src/BenchmarkToolkit/IOR_config/example_read.yml diff --git a/python_runs/IOR_config/example_write.yml b/src/BenchmarkToolkit/IOR_config/example_write.yml similarity index 100% rename from python_runs/IOR_config/example_write.yml rename to src/BenchmarkToolkit/IOR_config/example_write.yml diff --git a/python_runs/IOR_config/read.yml b/src/BenchmarkToolkit/IOR_config/read.yml similarity index 100% rename from python_runs/IOR_config/read.yml rename to src/BenchmarkToolkit/IOR_config/read.yml diff --git a/python_runs/IOR_config/write.yml b/src/BenchmarkToolkit/IOR_config/write.yml similarity index 100% rename from python_runs/IOR_config/write.yml rename to src/BenchmarkToolkit/IOR_config/write.yml diff --git a/src/BenchmarkToolkit/IOR_wrapper.py b/src/BenchmarkToolkit/IOR_wrapper.py new file mode 100644 index 0000000..6593aef --- /dev/null +++ b/src/BenchmarkToolkit/IOR_wrapper.py @@ -0,0 +1,111 @@ +import os +from . import handler_class +import json +import sys +from . import benchmark_tools +from . import miscellaneous +import re + + +def wrap_IOR(args): + job_number = args["slurm_job_number"] + + log_dir = f"results/iortest/{args['io_type']}/{args['platform_type']}/{job_number}" + command_log_dir = f"{log_dir}/commands" + + mpi_ranks = list( + benchmark_tools.split_arg_sequence(args["mpi_ranks"], "--mpi-ranks") + ) + filename = args["testFile"] + node_count = list( + benchmark_tools.split_arg_sequence(args["node_count"], "--node-count") + ) + block_size = args["block_size"] + transfer_size = args["transfer_size"] + segment_count = args["segment_count"] + if "output_format" in args.keys(): + output_format = args["output_format"] + else: + # RB: FIXME: No clue what the default should be here + output_format = "json" + if "deadline_for_stonewalling" in args.keys(): + deadline_for_stonewalling = args["deadline_for_stonewalling"] + else: + deadline_for_stonewalling = 0 + + # total_ranks = mpi_ranks[0]*node_count[0] + + # output_file = f"{log_dir}/ranks_per_node_{mpi_ranks[0]}_node_count_{node_count[0]}" + io_type = args["io_type"] + if "use_existing_file" in args.keys(): + use_existing_file = args["use_existing_file"] + else: + use_existing_file = False + + if "job_note" in args.keys(): + with open(f"{log_dir}/job_note.txt", "w") as file: + file.write(args["job_note"]) + + ior_obj_dict = {} + + print(f"SEQUENCES ARE {mpi_ranks} and nodes {node_count}") + for nodes in node_count: + for ranks in mpi_ranks: + print( + f"BEGINNING OF LOOPS _---------------------- {ranks} and nodes {nodes}" + ) + if "unit_restart" in args: + if args["unit_restart"] == 1: + pattern = "/" + split_dir = re.split(pattern, filename) + cephtest_root = "/" + split_dir[1] + "/" + split_dir[2] + miscellaneous.restart_ceph_unit(cephtest_root) + + total_ranks = ranks * nodes + output_file = f"{log_dir}/ranks_per_node_{ranks}_node_count_{nodes}" + + ior_obj_dict[f"{ranks}_{nodes}"] = handler_class.newIORTool() + ior_obj_dict[f"{ranks}_{nodes}"].setup_command( + config_file=f"{args['config']}", + mpi_ranks=total_ranks, + filename=filename, + ranks_per_node=ranks, + block_size=block_size, + transfer_size=transfer_size, + segment_count=segment_count, + reorder_tasks=1, + fsync=1, + output_file=f"{output_file}", + output_format=output_format, + deadline_for_stonewalling=deadline_for_stonewalling, + io_type=io_type, + use_existing_file=use_existing_file, + ) + + with open(f"{command_log_dir}/command_ior_{ranks}_{nodes}", "w") as file: + file.write("The following is the ior command") + tmp_cmd_string = "" + for cmd_el in ior_obj_dict[f"{ranks}_{nodes}"].command: + tmp_cmd_string += f" {cmd_el}" + file.write(tmp_cmd_string) + + ior_obj_dict[f"{ranks}_{nodes}"].run() + + for nodes in node_count: + for ranks in mpi_ranks: + # Output handling + output_file = f"{log_dir}/ranks_per_node_{ranks}_node_count_{nodes}" + combined_json_log_file = f"{log_dir}/combined_{ranks}_{nodes}_{block_size}" + json_log_file = output_file + + if os.path.exists(json_log_file): + bw, iops = miscellaneous.load_ior_json_results(json_log_file, log_dir) + else: + print(f"JSON LOG FILE NOT FOUND! ------- {output_file}") + sys.exit() + + data = {"nodes": nodes, "processors": ranks, "bw": bw, "iops": iops} + + with open(combined_json_log_file, "w") as json_file: + json.dump(data, json_file, indent=4) + print(f"Data successfully written to {combined_json_log_file}") diff --git a/src/BenchmarkToolkit/__init__.py b/src/BenchmarkToolkit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/BenchmarkToolkit/analyze_and_rebalance_load.py b/src/BenchmarkToolkit/analyze_and_rebalance_load.py new file mode 100644 index 0000000..1063580 --- /dev/null +++ b/src/BenchmarkToolkit/analyze_and_rebalance_load.py @@ -0,0 +1,4 @@ +def log_and_analyze_data_points(log_dir, fio_object): + # Let's try rate-limiting to see how that effects start and end times. + print(log_dir) + print(fio_object) diff --git a/python_runs/archive/Plot_side_to_side_PoC.py b/src/BenchmarkToolkit/archive/Plot_side_to_side_PoC.py similarity index 100% rename from python_runs/archive/Plot_side_to_side_PoC.py rename to src/BenchmarkToolkit/archive/Plot_side_to_side_PoC.py diff --git a/python_runs/archive/gpfs_test.yml b/src/BenchmarkToolkit/archive/gpfs_test.yml similarity index 100% rename from python_runs/archive/gpfs_test.yml rename to src/BenchmarkToolkit/archive/gpfs_test.yml diff --git a/python_runs/archive/plot_plus_new_parse_func.py b/src/BenchmarkToolkit/archive/plot_plus_new_parse_func.py similarity index 100% rename from python_runs/archive/plot_plus_new_parse_func.py rename to src/BenchmarkToolkit/archive/plot_plus_new_parse_func.py diff --git a/python_runs/archive/test_ior_log_combination.py b/src/BenchmarkToolkit/archive/test_ior_log_combination.py similarity index 100% rename from python_runs/archive/test_ior_log_combination.py rename to src/BenchmarkToolkit/archive/test_ior_log_combination.py diff --git a/src/BenchmarkToolkit/args_handler.py b/src/BenchmarkToolkit/args_handler.py new file mode 100644 index 0000000..9538d91 --- /dev/null +++ b/src/BenchmarkToolkit/args_handler.py @@ -0,0 +1,201 @@ +import argparse +import yaml +import sys + +# slurm job number +# triple replicated vs EC63 - FIO directory option +# block size? Multiple or single, either way need to decide which to test +# number of jobs? Not as an argument. At least not yet +# job length maybe +# Sequential vs random + + +def handle_arguments(): + parser = argparse.ArgumentParser( + description="This script wraps FIO and facilitates long-running variable testing on an FS." + ) + + # universal + parser.add_argument("--config", type=str, help="Path to the YAML config file.") + parser.add_argument( + "--slurm-job-number", + type=int, + help="Slurm job number this script is running under", + ) + parser.add_argument( + "--directory", + type=str, + help="Directory to run the test in. This is where the test files will be created.", + ) + parser.add_argument( + "--first-node", + type=str, + help="The first node in the node list. Will execute some preperatory steps on this node", + ) + parser.add_argument("--benchmark", type=str, help="The benchmark you want to run.") + parser.add_argument( + "--interface-name", + type=str, + help="The interface you want to monitor for inbound and outbound counters", + ) + parser.add_argument( + "--total-node-count", type=str, help="The total count of nodes in the job" + ) + parser.add_argument( + "--unit-restart", type=bool, help="Restart systemd unit (assumably ceph)" + ) + parser.add_argument( + "--node-count", + type=str, + help="Sequence of nodes that the benchmark should run with. e.g '1,2,4,6,8,10'", + ) + parser.add_argument("--job-note", type=str, help="insert a note for the job") + parser.add_argument( + "--wait-for-others", + type=bool, + help="True if nodes should wait for each other to finish iterations, false if not (1 or 0)", + ) + parser.add_argument( + "--in-parts", + type=bool, + help="True if the sequences of benchmark arguments should be run iteratively. This usually means there will be multiple log files which will need to be taken into account in the parsing & plotting steps.", + ) + + # ior portion + parser.add_argument( + "--testFile", type=str, help="File/directory to run the IOR test suite on" + ) + parser.add_argument("--transfer-size", type=str, help="transfer size") + parser.add_argument("--segment-count", type=str, help="segment count") + parser.add_argument("--reorder-tasks", type=str, help="reorder tasks") + parser.add_argument("--fsync", type=str, help="fsync") + parser.add_argument("--output-file", type=str, help="output file") + parser.add_argument("--output-format", type=str, help="output format") + parser.add_argument( + "--deadline-for-stonewalling", + type=int, + help="Run IOR in timed mode instead of an indefinite time. All ranks stop at the same time.", + ) + parser.add_argument("--use-existing-file", type=bool, help="Use existing test file") + + # mdtest portion + parser.add_argument( + "--mpi-ranks", type=str, help="Number of MPI ranks per node to use" + ) + parser.add_argument( + "--files-per-rank", type=str, help="Number of files to create per rank (mdtest)" + ) + parser.add_argument( + "--test-repetition", + type=str, + help="Number of times to repeat each test (mdtest)", + ) + parser.add_argument( + "--offset", + type=str, + help="Should there be a node offset? (if yes, 1, else ommit flag) (mdtest)", + ) + parser.add_argument( + "--write-data", + type=str, + help="Should mdtest write data into the files? Either 0 for no or a number of bytes (mdtest)", + ) + parser.add_argument( + "--read-data", + type=str, + help="Should mdtest read data from the files? Either 0 for no or a number of bytes (mdtest)", + ) + parser.add_argument( + "--timed", + type=str, + help="Specify the lower bound and upper bound of the time that the test should run for. Avoid values too close together. Units are seconds.", + ) + + # fio portion + parser.add_argument( + "--file-size", + type=str, + help="Specify the size of the file FIO should write out (per process)", + ) + parser.add_argument( + "--block-size", type=str, help="Block size that FIO should read/write at." + ) + parser.add_argument( + "--job-number", + type=str, + help="Number or sequence of number of jobs per node that FIO should run. e.g '1,5,10,15'. This is per node count in --node-count", + ) + parser.add_argument( + "--time", type=int, help="Number of seconds that FIO should run for." + ) + parser.add_argument( + "--io-type", + type=str, + help="write, read, randwrite, randread, among others. Which IO type should FIO issue?", + ) + parser.add_argument( + "--platform-type", + type=str, + help="Which platform are we using? This will decide output file path as well.", + ) + parser.add_argument( + "--split-hosts-file", + type=bool, + help="Should the wrapper split the original hosts file into subsections for the different iterations?", + ) + parser.add_argument( + "--hosts-file", + type=str, + help="Path to the intial hosts file which contains all hosts (At least FIO servers) involved.", + ) + parser.add_argument( + "--no-scrub", + type=bool, + help="(Ceph only) set noscrub and nodeepscrub flags on the ceph system. Requires passwordless SSH to the Ceph servers", + ) + parser.add_argument( + "--template-path", type=str, help="The path to the FIO template" + ) + + args = parser.parse_args() + args_dict = vars(args) + + config_dict = {} + + if args.config: + with open(args.config, "r") as file: + config_dict = yaml.safe_load(file) + + # Merge config and inline arguments, giving precedence to inline arguments + merged_dict = { + **config_dict, + **{k: v for k, v in args_dict.items() if v is not None}, + } + + # Set defaults if not provided + merged_dict.setdefault("time", 300) + merged_dict.setdefault("no_scrub", 0) + merged_dict.setdefault("split_hosts_file", False) + merged_dict.setdefault("interface_name", "") + merged_dict.setdefault("write_data", "0") + merged_dict.setdefault("read_data", "0") + merged_dict.setdefault("wait_for_others", 1) + + # Check for required arguments + # Trying a run without a hosts file to see if independent runs work + # required_args = ['block_size', 'directory', 'io_type', 'platform_type', 'job_number', 'node_count', 'hosts_file', 'template_path'] + # required_args = ['block_size', 'directory', 'io_type', 'platform_type', 'job_number', 'node_count', 'template_path', 'benchmark'] + required_args = ["io_type", "platform_type", "benchmark"] + missing_args = [ + arg + for arg in required_args + if arg not in merged_dict or merged_dict[arg] is None + ] + + # print (f"{merged_dict['write_data']} {merged_dict['write_data']}") + if missing_args and not merged_dict["not_taken_into_account"]["in_parts"]: + print(f"Error: Missing required arguments: {', '.join(missing_args)}") + sys.exit(1) + + # print(merged_dict) + return merged_dict diff --git a/python_runs/benchmark_tools.py b/src/BenchmarkToolkit/benchmark_tools.py similarity index 62% rename from python_runs/benchmark_tools.py rename to src/BenchmarkToolkit/benchmark_tools.py index a08f22e..f1eb5b1 100644 --- a/python_runs/benchmark_tools.py +++ b/src/BenchmarkToolkit/benchmark_tools.py @@ -1,48 +1,34 @@ -import os,sys +import sys from datetime import datetime import re + def count_lines(filename): - with open(filename, 'r') as file: + with open(filename, "r") as file: line_count = 0 - for line in file: + for _ in file: line_count += 1 return line_count -def create_node_list_file(node_string, filename, root_dir, job_num): - + +def create_node_list_file(node_string, filename, job_num): node_list = [] node_count_list = [] - - node_count_list = split_arg_sequence(str(node_string), '--split-host-file') - with open(filename, 'r') as file: + node_count_list = split_arg_sequence(str(node_string), "--split-host-file") + + with open(filename, "r") as file: for node_name in file: stripped_name = node_name.strip() node_list.append(stripped_name) for count in node_count_list: - i=0 - with open(f"{root_dir}/host_files/{job_num}_{count}_hosts.file", 'a') as file: + i = 0 + with open(f"host_files/{job_num}_{count}_hosts.file", "a") as file: while i < count: file.write(f"{node_list[i]}\n") i += 1 -''' -def create_list_from_string_seq(string, filename, root_dir, job_num): - - node_list = [] - node_count_list = [] - - node_count_list = split_arg_sequence(str(string), '--split-host-file') - - with open(filename, 'r') as file: - for node_name in file: - stripped_name = node_name.strip() - node_list.append(f"{stripped_name}") - - return node_list -''' def split_arg_sequence(sequence, arg): sequence_list = [] @@ -55,7 +41,9 @@ def split_arg_sequence(sequence, arg): else: sequence_list = [int(sequence_str)] except ValueError as ve: - print(f"ValueError: {ve}. Please ensure the input string for {arg} contains only numbers separated by commas.") + print( + f"ValueError: {ve}. Please ensure the input string for {arg} contains only numbers separated by commas." + ) sys.exit(1) except Exception as e: print(f"An unexpected error occurred: {e} \n {arg}") @@ -63,6 +51,7 @@ def split_arg_sequence(sequence, arg): return sequence_list + def split_block_size_sequence(sequence, arg): sequence_list = [] @@ -74,7 +63,9 @@ def split_block_size_sequence(sequence, arg): else: sequence_list = [sequence_str] except ValueError as ve: - print(f"ValueError: {ve}. Please ensure the input string for {arg} contains only numbers separated by commas.") + print( + f"ValueError: {ve}. Please ensure the input string for {arg} contains only numbers separated by commas." + ) sys.exit(1) except Exception as e: print(f"An unexpected error occurred: {e} \n {arg}") @@ -82,28 +73,28 @@ def split_block_size_sequence(sequence, arg): return sequence_list + def mdtest_start_end_elapsed_time(log_file): time_format = "%m/%d/%Y %H:%M:%S" try: - with open(log_file, 'r') as file: + with open(log_file, "r") as file: for line in file: - #print(f"This is the line: {line}") + # print(f"This is the line: {line}") if "started" in line: - #print(re.split(' ', line)) + # print(re.split(' ', line)) time_string = f"{re.split(' ', line)[3]} {re.split(' ', line)[4]}" - start_time_init = datetime.strptime(time_string, time_format) + start_time_init = datetime.strptime(time_string, time_format) start_time = int(start_time_init.timestamp()) - + if "finished" in line: - #print(re.split(' ', line)) + # print(re.split(' ', line)) time_string = f"{re.split(' ', line)[3]} {re.split(' ', line)[4]}" finish_time_init = datetime.strptime(time_string, time_format) finish_time = int(finish_time_init.timestamp()) + elapsed_time = finish_time - start_time + + return start_time, finish_time, elapsed_time except FileNotFoundError: print(f"{log_file} Not Found!") - sys.exit - - elapsed_time = finish_time - start_time - - return start_time, finish_time, elapsed_time + sys.exit() diff --git a/python_runs/count_lines_in_uncombined.py b/src/BenchmarkToolkit/count_lines_in_uncombined.py similarity index 53% rename from python_runs/count_lines_in_uncombined.py rename to src/BenchmarkToolkit/count_lines_in_uncombined.py index f92ae7b..b7918ff 100644 --- a/python_runs/count_lines_in_uncombined.py +++ b/src/BenchmarkToolkit/count_lines_in_uncombined.py @@ -3,20 +3,22 @@ import time from datetime import datetime + def get_block_size(file_path): stats = os.statvfs(file_path) return stats.f_bsize + def count_lines_in_file_direct(file_path): try: # Get the filesystem block size block_size = get_block_size(file_path) - + # Open the file with O_DIRECT flag - fd = os.open(file_path, os.O_RDONLY | os.O_DIRECT) + fd = os.open(file_path, os.O_RDONLY | os.O_DIRECT) # Pre-allocate a bytearray with the block size buffer = bytearray(block_size) - + line_count = 0 while True: # Read from the file into the buffer @@ -24,31 +26,38 @@ def count_lines_in_file_direct(file_path): if not n: break # Copy the read data into the bytearray - buffer[:len(n)] = n - + buffer[: len(n)] = n + # Convert buffer to string and count the lines - data = buffer[:len(n)].decode('utf-8') - line_count += data.count('\n') - + data = buffer[: len(n)].decode("utf-8") + line_count += data.count("\n") + # Close the file descriptor os.close(fd) return line_count except FileNotFoundError: return 0 -def wait_until_line_count_is_node_count(file_path, hostname, node_count, total_intervals, check_interval=5): + +def wait_until_line_count_is_node_count( + file_path, hostname, node_count, total_intervals, check_interval=5 +): wait_time = 0 while True: line_count = count_lines_in_file_direct(file_path) - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Current line count is {line_count}. Filename ({file_path}) Waiting...") + print( + f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Current line count is {line_count}. Filename ({file_path}) Waiting..." + ) if line_count > node_count: - print (f"Line count in intermediate results file is higher than the node count, possible duplicate reporting. This could be a result of using the same output directory more than once... File ({file_path})") + print( + f"Line count in intermediate results file is higher than the node count, possible duplicate reporting. This could be a result of using the same output directory more than once... File ({file_path})" + ) sys.exit() if int(line_count) <= int(node_count): - #print(f"Line count is {line_count} and type is {type(line_count)}, node count is {node_count} and type is {type(node_count)}") + # print(f"Line count is {line_count} and type is {type(line_count)}, node count is {node_count} and type is {type(node_count)}") found = 0 - with open (file_path, 'r') as file: + with open(file_path, "r") as file: lines = file.readlines() for i in lines: @@ -60,16 +69,17 @@ def wait_until_line_count_is_node_count(file_path, hostname, node_count, total_i if found == 0: return 0 if found > 1: - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] hostname found more than once in file!") + print( + f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] hostname found more than once in file!" + ) sys.exit() if found == 1 and line_count == node_count: return 1 time.sleep(check_interval) - wait_time += 1 + wait_time += 1 if wait_time >= total_intervals: - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Waited too long for uncombined to have the correct number of lines. Jobs and nodes are out of sync by over 40 minutes") + print( + f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] Waited too long for uncombined to have the correct number of lines. Jobs and nodes are out of sync by over 40 minutes" + ) sys.exit(1) - - print(f"{datetime.now().strftime('%b %d %H:%M:%S')} [{hostname}] uncombined file has reached {node_count} lines. Moving onto next job...") - diff --git a/python_runs/execute_ssh.py b/src/BenchmarkToolkit/execute_ssh.py similarity index 81% rename from python_runs/execute_ssh.py rename to src/BenchmarkToolkit/execute_ssh.py index 43e4231..16a5c5d 100644 --- a/python_runs/execute_ssh.py +++ b/src/BenchmarkToolkit/execute_ssh.py @@ -1,5 +1,6 @@ import paramiko + def execute_ssh_command(hostname, username, command): # Create SSH client ssh_client = paramiko.SSHClient() @@ -7,19 +8,20 @@ def execute_ssh_command(hostname, username, command): # Automatically add host keys ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + output = "" try: # Connect to the SSH server ssh_client.connect(hostname, username=username) # Execute the command - stdin, stdout, stderr = ssh_client.exec_command(command) + _, stdout, _ = ssh_client.exec_command(command) # Read the output output = stdout.read().decode().strip() # Print output - #print("Output of the command:") - #print(output) + # print("Output of the command:") + # print(output) except paramiko.AuthenticationException: print("Authentication failed.") @@ -28,5 +30,5 @@ def execute_ssh_command(hostname, username, command): finally: # Close the SSH connection ssh_client.close() - - return output \ No newline at end of file + + return output diff --git a/python_runs/handler_class.py b/src/BenchmarkToolkit/handler_class.py similarity index 60% rename from python_runs/handler_class.py rename to src/BenchmarkToolkit/handler_class.py index 5bcc792..bc053a3 100644 --- a/python_runs/handler_class.py +++ b/src/BenchmarkToolkit/handler_class.py @@ -1,17 +1,13 @@ import subprocess -import yaml -import json from abc import ABC, abstractmethod import time import os -from execute_ssh import execute_ssh_command +from .execute_ssh import execute_ssh_command import re import shlex from datetime import datetime -import time import sys -import psutil -import threading + class BenchmarkTool(ABC): def __init__(self): @@ -22,12 +18,13 @@ def __init__(self): self.elapsed_time = 0 self.start_times = [] self.end_times = [] - + @abstractmethod def setup_command(self, **params): """Setup the benchmark command with specific parameters.""" - self.params = params # Ensure params are stored whenever setup_command is called - pass + self.params = ( + params # Ensure params are stored whenever setup_command is called + ) @abstractmethod def parse_output(self, output): @@ -35,80 +32,100 @@ def parse_output(self, output): pass def set_noscrub(self): - noScrub = 0 noDeepScrub = 0 - - initial_output = execute_ssh_command('cephmon900', 'ceph', 'ceph status | grep flags' ) - print( initial_output) + + initial_output = execute_ssh_command( + "cephmon900", "ceph", "ceph status | grep flags" + ) + print(initial_output) if initial_output: - result = re.split(r'[,\s]+', initial_output) - + result = re.split(r"[,\s]+", initial_output) + for res in result: if res == "noscrub": noScrub = 1 if res == "nodeep-scrub": noDeepScrub = 1 - + if noScrub == 0 or noDeepScrub == 0: - print (f"one of the flags was unset (deep = {noDeepScrub}, scrub = {noScrub}), ensuring both are set...") - - #do setting work here - print( execute_ssh_command('cephmon900', 'ceph', 'ceph osd set noscrub' ) ) - print( execute_ssh_command('cephmon900', 'ceph', 'ceph osd set nodeep-scrub' ) ) + print( + f"one of the flags was unset (deep = {noDeepScrub}, scrub = {noScrub}), ensuring both are set..." + ) + + # do setting work here + print(execute_ssh_command("cephmon900", "ceph", "ceph osd set noscrub")) + print( + execute_ssh_command( + "cephmon900", "ceph", "ceph osd set nodeep-scrub" + ) + ) else: - print ("noscrub was already set, please ensure this is intended...") + print("noscrub was already set, please ensure this is intended...") else: print("noscrub was not set, ensuring noscrub and nodeep-scrub are set...") - - #do setting work here - print( execute_ssh_command('cephmon900', 'ceph', 'ceph osd set noscrub' ) ) - print( execute_ssh_command('cephmon900', 'ceph', 'ceph osd set nodeep-scrub' ) ) + + # do setting work here + print(execute_ssh_command("cephmon900", "ceph", "ceph osd set noscrub")) + print( + execute_ssh_command("cephmon900", "ceph", "ceph osd set nodeep-scrub") + ) def set_scrub(self): - noScrub = 0 noDeepScrub = 0 - - initial_output = execute_ssh_command('cephmon900', 'ceph', 'ceph status | grep flags' ) + + initial_output = execute_ssh_command( + "cephmon900", "ceph", "ceph status | grep flags" + ) print(initial_output) if initial_output: - result = re.split(r'[,\s]+', initial_output) - + result = re.split(r"[,\s]+", initial_output) + for res in result: if res == "noscrub": noScrub = 1 if res == "nodeep-scrub": noDeepScrub = 1 - + if noScrub == 0 and noDeepScrub == 0: "noscrub was unset before this module completed, please ensure that is intended..." else: print("either noscrub or no-deepscrub is set, moving to unset") - execute_ssh_command('cephmon900', 'ceph', 'ceph osd unset noscrub' ) - execute_ssh_command('cephmon900', 'ceph', 'ceph osd unset nodeep-scrub' ) + execute_ssh_command("cephmon900", "ceph", "ceph osd unset noscrub") + execute_ssh_command("cephmon900", "ceph", "ceph osd unset nodeep-scrub") else: - print ("noscrub was unset before this module completed, please ensure that is intended...") + print( + "noscrub was unset before this module completed, please ensure that is intended..." + ) def run(self): - try: env = os.environ.copy() # Open the output file for appending - if 'write_output' in self.params and self.params['write_output'] == 1: - - with open(self.params['output_file'], 'a') as output_file: + if "write_output" in self.params and self.params["write_output"] == 1: + with open(self.params["output_file"], "a") as output_file: # Start the subprocess with stdout as PIPE to capture output start_time = time.time() - #print (f"This is self.command: {self.command} .. handler_class:101") + # print (f"This is self.command: {self.command} .. handler_class:101") # Run the command and capture output in real-time - process = subprocess.Popen(self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env) + process = subprocess.Popen( + self.command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + env=env, + ) + if not process.stdout: + raise ValueError( + "Process output stream is empty. Check the command or its parameters." + ) # Read and print the output in real-time for line in process.stdout: # Print to terminal - print(line, end='') + print(line, end="") # Write to the output file output_file.write(line) @@ -121,11 +138,13 @@ def run(self): self.elapsed_time = elapsed_time print(datetime.now().time(), f"Time to complete: {elapsed_time}") - + else: # Start the subprocess and wait for it to finish start_time = time.time() - result = subprocess.run(self.command, capture_output=False, text=True, check=True, env=env) + _ = subprocess.run( + self.command, capture_output=False, text=True, check=True, env=env + ) end_time = time.time() elapsed_time = end_time - start_time @@ -139,20 +158,18 @@ def run(self): class test_ior_tool(BenchmarkTool): - def setup_command(self, **params): super().setup_command(**params) self.command = ["mpirun"] - - config_params = params.get('config') - - mpi_ranks = params.get('mpi_ranks') - filename = config_params['filename'] - ranks_per_node = params.get('ranks_per_node') - output_file = params.get('output_file') - output_format = config_params - + + config_params = params.get("config") + + mpi_ranks = params.get("mpi_ranks") + filename = config_params["filename"] + ranks_per_node = params.get("ranks_per_node") + output_file = params.get("output_file") + # Required parameter: output file if mpi_ranks: self.command.extend(["-n", str(mpi_ranks)]) @@ -161,67 +178,79 @@ def setup_command(self, **params): self.command.append("--map-by") self.command.append("node") - + if ranks_per_node: self.command.extend(["-N", str(ranks_per_node)]) - + self.command.append("--verbose") self.command.append("ior") - not_iteratable = ['mpi_ranks', 'node_count', 'filename', 'config_options', 'command_extensions', 'job_note', 'platform_type', 'unit_restart', 'io_type', 'output_file'] + not_iteratable = [ + "mpi_ranks", + "node_count", + "filename", + "config_options", + "command_extensions", + "job_note", + "platform_type", + "unit_restart", + "io_type", + "output_file", + ] if filename: self.command.extend(["-o", str(filename)]) else: raise ValueError("filename must be specified.") - #print(config_params) + # print(config_params) for param, value in config_params.items(): if param not in not_iteratable: self.command.extend([f"-{param}={str(value)}"]) - if param == 'config_options': + if param == "config_options": for key, val in value.items(): self.command.extend([f"-{key}={str(val)}"]) - if param == 'command_extensions': + if param == "command_extensions": for i in value: self.command.extend([f"-{i}"]) if output_file: - self.command.extend(['-O', f"summaryFile={output_file}"]) + self.command.extend(["-O", f"summaryFile={output_file}"]) else: raise ValueError("Output file must be specified") def parse_output(self, output): return "IOR no parsing yet." + class newIORTool(BenchmarkTool): - #pass - #mpirun -n 64 ./ior -t 1m -b 16m -s 16 -F -C -e -o /path/to/TestFile + # pass + # mpirun -n 64 ./ior -t 1m -b 16m -s 16 -F -C -e -o /path/to/TestFile def setup_command(self, **params): super().setup_command(**params) self.command = ["mpirun"] - config_file = params.get('config_file') - + config_file = params.get("config_file") + if config_file: pass else: raise ValueError("Configuration file must be specified. IOR...") - - mpi_ranks = params.get('mpi_ranks') - filename = params.get('filename') - ranks_per_node = params.get('ranks_per_node') - block_size = params.get('block_size') - transfer_size = params.get('transfer_size') - segment_count = params.get('segment_count') - reorder_tasks = params.get('reorder_tasks') - fsync = params.get('fsync') - output_file = params.get('output_file') - output_format = params.get('output_format') - deadline_for_stonewalling = params.get('deadline_for_stonewalling') - io_type = params.get('io_type') - use_existing_file = params.get('use_existing_file') + + mpi_ranks = params.get("mpi_ranks") + filename = params.get("filename") + ranks_per_node = params.get("ranks_per_node") + block_size = params.get("block_size") + transfer_size = params.get("transfer_size") + segment_count = params.get("segment_count") + reorder_tasks = params.get("reorder_tasks") + fsync = params.get("fsync") + output_file = params.get("output_file") + output_format = params.get("output_format") + deadline_for_stonewalling = params.get("deadline_for_stonewalling") + io_type = params.get("io_type") + use_existing_file = params.get("use_existing_file") # Required parameter: output file if mpi_ranks: @@ -231,22 +260,22 @@ def setup_command(self, **params): self.command.append("--map-by") self.command.append("node") - + if ranks_per_node: self.command.extend(["-N", str(ranks_per_node)]) - + self.command.append("--verbose") self.command.append("ior") - + if block_size: self.command.extend(["-b", str(block_size)]) - + if transfer_size: self.command.extend(["-t", str(transfer_size)]) if segment_count: self.command.extend(["-s", str(segment_count)]) - + if reorder_tasks: self.command.extend(["-C"]) @@ -254,106 +283,68 @@ def setup_command(self, **params): self.command.extend(["-e"]) if deadline_for_stonewalling != 0: - self.command.extend(['-D', f"{deadline_for_stonewalling}"]) + self.command.extend(["-D", f"{deadline_for_stonewalling}"]) else: pass - if io_type == 'write': - self.command.extend(['-w']) - elif io_type == 'read': - self.command.extend(['-r']) - ''' + if io_type == "write": + self.command.extend(["-w"]) + elif io_type == "read": + self.command.extend(["-r"]) + """ list_of_opts = [ "api", "refNum", "blockSize", "collective", "reorderTasksConstant", "interTestDelay", "deadlineForStonewalling", "fsync", "useExistingTestFile", "scriptFile", "filePerProc", "intraTestBarriers", "setTimeStampSignature", "showHelp", "showHints", "repetitions", "individualDataSets", "outlierThreshold", "setAlignment", "keepFile", "keepFileWithError", "data", "multiFile", "memoryPerNode", "noFill", "numTasks", "testFile", "string", "preallocate", "useSharedFilePointer", "quitOnError", "taskPerNodeOffset", "readFile", "checkRead", "segmentCount", "useStridedDatatype", "transferSize", "maxTimeDuration", "uniqueDir", "hintsFileName", "verbose", "useFileView", "writeFile", "checkWrite", "singleXferAttempt", "reorderTasksRandomSeed", "fsyncPerWrite", "randomOffset", "reorderTasksRandom" ] with open (config_file, 'r') as opts_file: config = yaml.safe_load(opts_file) print(config) - ''' - self.command.extend(['-k']) - #self.command.extend(['-i', '1000']) - #self.command.extend(['-T', '1']) - self.command.extend(['-F']) + """ + self.command.extend(["-k"]) + # self.command.extend(['-i', '1000']) + # self.command.extend(['-T', '1']) + self.command.extend(["-F"]) if use_existing_file is True: - self.command.extend(['-E']) + self.command.extend(["-E"]) # Required parameter: output file - output_file = params.get('output_file') + output_file = params.get("output_file") if filename: self.command.extend(["-o", str(filename)]) else: raise ValueError("filename must be specified.") - + if output_file: - self.command.extend(['-O', f"summaryFile={output_file}"]) + self.command.extend(["-O", f"summaryFile={output_file}"]) else: raise ValueError("Output file must be specified") if output_format: - self.command.extend(['-O', f"summaryFormat={output_format}"]) + self.command.extend(["-O", f"summaryFormat={output_format}"]) else: raise ValueError("Output file format must be specified") def parse_output(self, output): return "IOR no parsing yet." -''' -class test_mdtest_tool(BenchmarkTool): - pass - def setup_command(self, **params): - super().setup_command(**params) - - self.command = ["mpirun"] - - config_params = params.get('config') - - mpi_ranks = params.get('mpi_ranks') - ranks_per_node = params.get('ranks_per_node') - files_per_rank = params.get('files_per_rank') - directory = params.get('directory') - - # Required parameter: output file - if mpi_ranks: - self.command.extend(["-n", str(mpi_ranks)]) - else: - raise ValueError("Number of MPI ranks must be specified (--mpi-ranks)") - - self.command.append("--map-by") - self.command.append("node") - - if ranks_per_node: - self.command.extend(["-N", str(ranks_per_node)]) - - self.command.append("--verbose") - self.command.append("mdtest") - - not_iteratable = ['mpi_ranks', 'node_count', 'filename', 'config_options', 'command_extensions', 'job_note', 'platform_type', 'unit_restart', 'io_type', 'output_file', 'timed'] - - #test_repetition = params.get('test_repetition') - #offset = params.get('offset') - #write_into_file = params.get('write_data') - #read_from_file = params.get('read_data') -# pass -''' class mdtestTool(BenchmarkTool): def setup_command(self, **params): super().setup_command(**params) - - config_file = params.get('config_file') + + config_file = params.get("config_file") if config_file: pass else: raise ValueError("Configuration file must be specified. mdtest...") - mpi_ranks = params.get('mpi_ranks') - files_per_rank = params.get('files_per_rank') - directory = params.get('directory') - ranks_per_node = params.get('ranks_per_node') + mpi_ranks = params.get("mpi_ranks") + files_per_rank = params.get("files_per_rank") + directory = params.get("directory") + ranks_per_node = params.get("ranks_per_node") # Required parameter: output file - output_file = params.get('output_file') - config_params = params.get('config') + output_file = params.get("output_file") + config_params = params.get("config") self.command = ["mpirun"] @@ -364,78 +355,94 @@ def setup_command(self, **params): self.command.append("--map-by") self.command.append("node") - + if ranks_per_node: self.command.extend(["-N", str(ranks_per_node)]) - + self.command.append("--verbose") self.command.append("mdtest") - + if files_per_rank: self.command.extend(["-n", str(files_per_rank)]) else: - raise ValueError("Number of files per rank must be specified (--files-per-rank)") + raise ValueError( + "Number of files per rank must be specified (--files-per-rank)" + ) if directory: self.command.extend(["-d", directory]) else: raise ValueError("Directory must be specified. (--directory)") - - not_iteratable = ['mpi_ranks', 'directory', 'files_per_rank', 'node_count', 'timed', 'config_options', 'command_extensions', 'job_note', 'platform_type', 'unit_restart', 'io_type', 'output_file', 'write_output', 'in_parts'] + not_iteratable = [ + "mpi_ranks", + "directory", + "files_per_rank", + "node_count", + "timed", + "config_options", + "command_extensions", + "job_note", + "platform_type", + "unit_restart", + "io_type", + "output_file", + "write_output", + "in_parts", + ] for param, value in config_params.items(): if param not in not_iteratable: self.command.extend([f"-{param}={str(value)}"]) - if param == 'config_options': + if param == "config_options": for key, val in value.items(): self.command.extend([f"-{key}={str(val)}"]) - if param == 'command_extensions': + if param == "command_extensions": for i in value: self.command.extend([f"-{i}"]) if output_file: pass - #self.command.extend([">>", str(output_file)]) + # self.command.extend([">>", str(output_file)]) else: raise ValueError("Output file must be specified") def parse_output(self, output): return "mdtest no parsing yet." + class FIOTool(BenchmarkTool): - def setup_command(self, **params): super().setup_command(**params) # Call the base method to store params - + self.command = [ - "fio", - ] - + "fio", + ] + # Required parameter: configuration file - config_file = params.get('config_file') + config_file = params.get("config_file") if config_file: self.command.append(f"{config_file}") else: raise ValueError("Configuration file must be specified") - + # Required parameter: output format - output_format = params.get('output_format') + output_format = params.get("output_format") if output_format: self.command.append(f"--output-format={output_format}") else: raise ValueError("Output format must be specified") - + # Required parameter: output file - output_file = params.get('output_file') + output_file = params.get("output_file") if output_file: self.command.append(f"--output={output_file}") else: raise ValueError("Output file must be specified") - + # Optional parameter: FIO host file - host_file = params.get('host_file') - host_list = params.get('host_list') + host_file = params.get("host_file") + host_list = params.get("host_list") if host_file: self.command.append(f"--client={host_file}") @@ -443,7 +450,18 @@ def setup_command(self, **params): self.command.append(f"--client={host_list}") def parse_output(self, output): - return {job['jobname']: {'Read BW': job['read']['bw'], 'Write BW': job['write']['bw']} for job in output['jobs']} + return { + job["jobname"]: { + "Read BW": job["read"]["bw"], + "Write BW": job["write"]["bw"], + } + for job in output["jobs"] + } + + def __repr__(self): + # FIXME: Return actually useful string + return "FIOTool Command: " + " ".join(self.command) + class IORTool(BenchmarkTool): def __init__(self, slurm_script_path): @@ -454,22 +472,27 @@ def setup_command(self, **params): super().setup_command(**params) # Optionally handle common benchmark parameters # Initialize the command with mpirun - num_procs = params.get('num_procs', 1) + num_procs = params.get("num_procs", 1) self.command = ["mpirun"] - + self.command.append("--map-by") self.command.append("node") - self.command.extend(['-np', str(num_procs)]) - + self.command.extend(["-np", str(num_procs)]) + # Append the IOR executable self.command.append("ior") # Dynamically append other IOR specific parameters - recognized_params = {'transfer_size': '-t', 'block_size': '-b', - 'segment_count': '-s', 'file_per_proc': '-F', - 'collective': '-c', 'fsync': '-e'} - + recognized_params = { + "transfer_size": "-t", + "block_size": "-b", + "segment_count": "-s", + "file_per_proc": "-F", + "collective": "-c", + "fsync": "-e", + } + for param, flag in recognized_params.items(): value = params.get(param) if value is not None: @@ -481,31 +504,35 @@ def setup_command(self, **params): self.command.extend([flag, str(value)]) # Mandatory test file parameter - if 'test_file' in params: - self.command.extend(["-o", params['test_file']]) + if "test_file" in params: + self.command.extend(["-o", params["test_file"]]) else: raise ValueError("Test file must be specified") def run(self): # Generate the full IOR command - command_str = ' '.join(shlex.quote(arg) for arg in self.command) - + command_str = " ".join(shlex.quote(arg) for arg in self.command) + # Read the Slurm script - with open(self.slurm_script_path, 'r') as file: + with open(self.slurm_script_path, "r") as file: script_content = file.read() # Replace the placeholder with the IOR command - modified_script_content = script_content.replace("<