From e0084597f2eb870b25a0c9bc3a8f0b7109ec94e4 Mon Sep 17 00:00:00 2001 From: antisch Date: Thu, 10 Feb 2022 11:01:55 +1300 Subject: [PATCH 01/14] Draft multiproc support --- .../perfstress_tests/_perf_stress_base.py | 9 +- .../perfstress_tests/_perf_stress_proc.py | 140 ++++++++++++ .../perfstress_tests/_perf_stress_runner.py | 210 +++++++++++------- .../perfstress_tests/_repeated_timer.py | 5 +- 4 files changed, 274 insertions(+), 90 deletions(-) create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py index 9dc38bc59375..de211c578a7c 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py @@ -6,6 +6,7 @@ import os import abc import threading +import multiprocessing import argparse @@ -106,17 +107,15 @@ class _PerfTestBase(_PerfTestABC): """Base class for implementing a python perf test.""" args = {} - _global_parallel_index_lock = threading.Lock() + _global_parallel_index_lock = multiprocessing.Lock() _global_parallel_index = 0 def __init__(self, arguments): self.args = arguments self._completed_operations = 0 self._last_completion_time = 0.0 - - with _PerfTestBase._global_parallel_index_lock: - self._parallel_index = _PerfTestBase._global_parallel_index - _PerfTestBase._global_parallel_index += 1 + self._parallel_index = _PerfTestBase._global_parallel_index + _PerfTestBase._global_parallel_index += 1 @property def completed_operations(self) -> int: diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py new file mode 100644 index 000000000000..7148ca6057cb --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py @@ -0,0 +1,140 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio +from typing import List, Optional +import multiprocessing +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed + +from ._perf_stress_base import _PerfTestABC, _PerfTestBase + + +def run_process(index, args, module, test_name, num_tests, test_stages, results, status): + """The child process main function. + + Here we load the test class from the correct module and start it. + """ + test_module = module[0].load_module(module[1]) + test_class = getattr(test_module, test_name) + value = asyncio.run(_start_tests(index, test_class, num_tests, args, index == 0, test_stages, results, status)) + return value + + +def _synchronize(stages, ignore_error=False): + """Synchronize all processes by waiting on the barrier. + + Optionally we can also ignore a broken barrier during the cleanup stages + so that if some processes have failed, the others can still complete cleanup. + """ + try: + stages.wait() + except threading.BrokenBarrierError: + if not ignore_error: + raise + +async def _start_tests(index, test_class, num_tests, args, do_setup, test_stages, results, status): + """Create test classes, run setup, tests and cleanup.""" + # Create all parallel tests with a global unique index value + with _PerfTestBase._global_parallel_index_lock: + _PerfTestBase._global_parallel_index = index + tests = [test_class(args) for _ in range(num_tests)] + try: + # Only the child process with index=0 will run the global setup. + if do_setup: + await tests[0].global_setup() + + # Waiting till all procs are ready to start "Setup". This allows one child + # process to setup any global resources before the rest of setup is run. + _synchronize(test_stages) + await asyncio.gather(*[test.setup() for test in tests]) + + # Waiting till all procs are ready to start "Post Setup" + _synchronize(test_stages) + await asyncio.gather(*[test.post_setup() for test in tests]) + + if args.warmup and not args.profile: + # Waiting till all procs are ready to start "Warmup" + _synchronize(test_stages) + await _run_tests(args.warmup, args, tests, results, status) + + # Waiting till all procs are ready to start "Tests" + _synchronize(test_stages) + await _run_tests(args.duration, args, tests, results, status) + + # Waiting till all procs have finished tests, ready to start "Pre Cleaup" + _synchronize(test_stages) + except threading.BrokenBarrierError: + # A separate process has failed, so all of them are shutting down. + print("Another test process has aborted - shutting down.") + except Exception as e: + print("Test processes failed. Aborting.\n{}".format(e)) + test_stages.abort() + finally: + try: + # We'll attempt to clean up the tests using the barrier. + # This may fail if the tests are already in an unrecoverable error state. + # If one process has failed, we'll still attempt to clean up without the barrier. + await asyncio.gather(*[test.pre_cleanup() for test in tests]) + if not args.no_cleanup: + # Waiting till all procs are ready to start "Cleanup" + # If any process has failed earlier, the barrier will be broken - so wait + # if we can but otherwise attempt to clean up anyway. + _synchronize(test_stages, ignore_error=True) + await asyncio.gather(*[test.cleanup() for test in tests]) + + # Waiting till all processes have completed the cleanup stages. + _synchronize(test_stages, ignore_error=True) + if do_setup and not args.no_cleanup: + await tests[0].global_cleanup() + except Exception as e: + # Tests were unable to clean up, maybe due to earlier failure state. + print("Failed to cleanup up tests: {}".format(e)) + finally: + # Always call close on the tests, even if cleanup failed. + await asyncio.gather(*[test.close() for test in tests]) + + +async def _run_tests(duration: int, args, tests, results, status) -> None: + """Run the listed tests either in parallel asynchronously or in a thread pool.""" + # Kick of a status monitoring thread. + stop_status = threading.Event() + status_thread = threading.Thread( + target=_report_status, + args=(status, tests, stop_status), + daemon=True) + status_thread.start() + + try: + if args.sync: + with ThreadPoolExecutor(max_workers=args.parallel) as ex: + futures = [ex.submit(test.run_all_sync, duration) for test in tests] + for future in as_completed(futures): + future.result() + + else: + tasks = [test.run_all_async(duration) for test in tests] + await asyncio.gather(*tasks) + + # Add final test results to the results queue to be accumulated by the parent process. + for test in tests: + results.put((test._parallel_index, test.completed_operations, test.last_completion_time)) + finally: + # Clean up status reporting thread. + stop_status.set() + status_thread.join() + + +def _report_status(status: multiprocessing.JoinableQueue, tests: List[_PerfTestABC], stop: threading.Event): + """Report ongoing status of running tests. + + This is achieved by adding status to a joinable queue then waiting for that queue to be cleared + by the parent processes. This should implicitly synchronize the status reporting across all child + processes and the parent will dictate the frequence by which status is gathered. + """ + while not stop.is_set(): + for test in tests: + status.put((test._parallel_index, test.completed_operations, test.last_completion_time)) + status.join() diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py index 63e03db17cc3..cd37842249bb 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -4,26 +4,25 @@ # -------------------------------------------------------------------------------------------- import argparse -import asyncio -import time import inspect import logging import os import pkgutil import sys -from typing import List, Optional -from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Optional, Tuple +import multiprocessing +import threading -from ._perf_stress_base import _PerfTestABC +from ._perf_stress_base import _PerfTestABC, _PerfTestBase from ._batch_perf_test import BatchPerfTest from ._event_perf_test import EventPerfTest from ._perf_stress_test import PerfStressTest from ._repeated_timer import RepeatedTimer +from ._perf_stress_proc import run_process class _PerfStressRunner: def __init__(self, test_folder_path: Optional[str] = None, debug: bool = False): - self._tests: List[_PerfTestABC] = [] self._operation_status_tracker: int = -1 self.logger = logging.getLogger(__name__) @@ -38,16 +37,15 @@ def __init__(self, test_folder_path: Optional[str] = None, debug: bool = False): # NOTE: If you need to support registering multiple test locations, move this into Initialize, call lazily on Run, expose RegisterTestLocation function. self._discover_tests(test_folder_path or os.getcwd()) - self._parse_args() + self._test_name: str = self._parse_args() - def _get_completed_operations(self) -> int: - return sum([t.completed_operations for t in self._tests]) + def _get_completed_operations(self, results: List[Tuple[int, int, float]]) -> int: + return sum([r[1] for r in results]) - def _get_operations_per_second(self) -> float: - test_results = [(t.completed_operations, t.last_completion_time) for t in self._tests] - return sum(map(lambda x: x[0] / x[1] if x[1] else 0, test_results)) + def _get_operations_per_second(self, results: List[Tuple[int, int, float]]) -> float: + return sum(map(lambda x: x[1] / x[2] if x[2] else 0, results)) - def _parse_args(self): + def _parse_args(self) -> str: # First, detect which test we're running. arg_parser = argparse.ArgumentParser( description="Python Perf Test Runner", usage="{} []".format(__file__) @@ -60,8 +58,9 @@ def _parse_args(self): ) args = arg_parser.parse_args(sys.argv[1:2]) + test_name = args.test try: - self._test_class_to_run = self._test_classes[args.test] + self._test_class_to_run = self._test_classes[test_name][0] except KeyError as e: self.logger.error( "Invalid test: {}\n Test must be one of: {}\n".format( @@ -72,7 +71,7 @@ def _parse_args(self): # Next, parse args for that test. We also do global args here too so as not to confuse the initial test parse. per_test_arg_parser = argparse.ArgumentParser( - description=self._test_class_to_run.__doc__ or args.test, usage="{} {} []".format(__file__, args.test) + description=self._test_class_to_run.__doc__ or test_name, usage="{} {} []".format(__file__, args.test) ) # Global args @@ -80,15 +79,10 @@ def _parse_args(self): "-p", "--parallel", nargs="?", type=int, help="Degree of parallelism to run with. Default is 1.", default=1 ) per_test_arg_parser.add_argument( - "-d", "--duration", nargs="?", type=int, help="Duration of the test in seconds. Default is 10.", default=10 + "--processes", nargs="?", type=int, help="Number of concurrent processes over which to distribute the parallel runs. Default is the number of cores.", default=multiprocessing.cpu_count() ) per_test_arg_parser.add_argument( - "-i", - "--iterations", - nargs="?", - type=int, - help="Number of iterations in the main test loop. Default is 1.", - default=1, + "-d", "--duration", nargs="?", type=int, help="Duration of the test in seconds. Default is 10.", default=10 ) per_test_arg_parser.add_argument( "-w", "--warmup", nargs="?", type=int, help="Duration of warmup in seconds. Default is 5.", default=5 @@ -119,6 +113,7 @@ def _parse_args(self): self.logger.info(args) self.logger.info(self.per_test_args) self.logger.info("") + return test_name def _discover_tests(self, test_folder_path): base_classes = [PerfStressTest, BatchPerfTest, EventPerfTest] @@ -128,11 +123,12 @@ def _discover_tests(self, test_folder_path): self.logger.debug("Searching for tests in {}".format(test_folder_path)) # Dynamically enumerate all python modules under the tests path for classes that implement PerfStressTest - for loader, name, _ in pkgutil.walk_packages([test_folder_path]): + for loader, module_name, _ in pkgutil.walk_packages([test_folder_path]): try: - module = loader.find_module(name).load_module(name) + module_loader = loader.find_module(module_name) + module = module_loader.load_module(module_name) except Exception as e: - self.logger.debug("Unable to load module {}: {}".format(name, e)) + self.logger.debug("Unable to load module {}: {}".format(module_name, e)) continue for name, value in inspect.getmembers(module): if name.startswith("_"): @@ -140,69 +136,112 @@ def _discover_tests(self, test_folder_path): if inspect.isclass(value): if issubclass(value, _PerfTestABC) and value not in base_classes: self.logger.info("Loaded test class: {}".format(name)) - self._test_classes[name] = value + self._test_classes[name] = (value, (module_loader, module_name)) + + def _next_stage(self, title: str, track_status: bool = False, report_results: bool = False): + # Wait for previous stage to complete. + self.test_stages.wait() + + # Reset barrier to start next stage. + self.test_stages.reset() + + # Stop any status tracking of the previous stage. + if self.status_thread.is_running: + self.status_thread.stop() + + # If previous stage had results, report. + if report_results: + self._report_results() + + self.logger.info("") + if title: + self.logger.info("=== {} ===".format(title)) + + # If next stage status should be tracked, restart tracker. + if track_status: + self._operation_status_tracker = -1 + self.status_thread.start() async def start(self): - self.logger.info("=== Setup ===") - self._tests = [self._test_class_to_run(self.per_test_args) for _ in range(self.per_test_args.parallel)] + # If unspecified, number of process will be the lesser of number of cores + # and number of parallel tests. + processes = min(self.per_test_args.parallel, self.per_test_args.processes) + + # Evenly divide the number of parallel tests between the processes into a list + # of tuples containing the first parallel index used by each process, and the number + # of threads that will be run by each process. + # E.g. if parallel=10, processes=4: mapping=[(0, 3), (3, 3), (6, 2), (8, 2)] + k, m = divmod(self.per_test_args.parallel, processes) + mapping = [(i*k+min(i, m), ((i+1)*k+min(i+1, m)) - (i*k+min(i, m))) for i in range(processes)] + + self.results = multiprocessing.Queue() + self.status = multiprocessing.JoinableQueue() + self.status_thread = RepeatedTimer(1, self._print_status, start_now=False) + + # The barrier will synchronize each child proc with the parent at each stage of the + # the testing run. This prevents one proc from running tests while global resources + # are still being configured or cleaned up. + self.test_stages = multiprocessing.Barrier(processes + 1) try: - try: - await self._tests[0].global_setup() - try: - await asyncio.gather(*[test.setup() for test in self._tests]) - self.logger.info("") - self.logger.info("=== Post Setup ===") - await asyncio.gather(*[test.post_setup() for test in self._tests]) - self.logger.info("") - - if self.per_test_args.warmup and not self.per_test_args.profile: - await self._run_tests("Warmup", self.per_test_args.warmup) - - for i in range(self.per_test_args.iterations): - title = "Test" if self.per_test_args.iterations == 1 else "Test {}".format(i + 1) - await self._run_tests(title, self.per_test_args.duration) - except Exception as e: - self.logger.warn("Exception: " + str(e)) - finally: - self.logger.info("=== Pre Cleanup ===") - await asyncio.gather(*[test.pre_cleanup() for test in self._tests]) - self.logger.info("") - - if not self.per_test_args.no_cleanup: - self.logger.info("=== Cleanup ===") - await asyncio.gather(*[test.cleanup() for test in self._tests]) - except Exception as e: - self.logger.warn("Exception: " + str(e)) - finally: - if not self.per_test_args.no_cleanup: - await self._tests[0].global_cleanup() + futures = [multiprocessing.Process( + target=run_process, + args=( + index, + self.per_test_args, + self._test_classes[self._test_name][1], + self._test_name, + threads, + self.test_stages, + self.results, + self.status), + daemon=True) for index, threads in mapping] + [f.start() for f in futures] + + # All tests wait to start "Setup". + # This allows one proc to finish the "GlobalSetup" before all of them + # start the per-test "Setup". + self._next_stage("Setup") + + # All tests wait to start Post Setup. + self._next_stage("Post Setup") + + # If a warm up is configured, wait will all tests have finished all setup + # stages before beginning "Warmup". + if self.per_test_args.warmup: + self._next_stage("Warmup", track_status=True) + + # Wait will all tests have completed setup and warmup before beginning "Tests". + self._next_stage("Tests", track_status=True, report_results=True) + + # Wait till all tests have completed before benning cleanup and shutdown. + self._next_stage("Pre Cleanup", report_results=True) + + # If cleanup is configured, wait till all tests are ready to begin "Cleanup" + if not self.per_test_args.no_cleanup: + self._next_stage("Cleanup") + + # Wait till all tests have finished cleaning up, this allows one proc to start + # the "GlobalCleanup" which may start pulling down resources. + self._next_stage(None) + + # Close all procs. + [f.join() for f in futures] + + except threading.BrokenBarrierError: + self.logger.warn("Exception: One or more test processes failed and exited.") except Exception as e: self.logger.warn("Exception: " + str(e)) - finally: - await asyncio.gather(*[test.close() for test in self._tests]) - async def _run_tests(self, title: str, duration: int) -> None: - self._operation_status_tracker = -1 - status_thread = RepeatedTimer(1, self._print_status, title) - try: - if self.per_test_args.sync: - with ThreadPoolExecutor(max_workers=self.per_test_args.parallel) as ex: - futures = [ex.submit(test.run_all_sync, duration) for test in self._tests] - for future in as_completed(futures): - future.result() - - else: - tasks = [test.run_all_async(duration) for test in self._tests] - await asyncio.gather(*tasks) - finally: - status_thread.stop() + def _report_results(self): + """Calculate and log the test run results across all child processes""" + operations = [] + while not self.results.empty(): + operations.append(self.results.get()) + total_operations = self._get_completed_operations(operations) self.logger.info("") - self.logger.info("=== Results ===") - - total_operations = self._get_completed_operations() - operations_per_second = self._get_operations_per_second() + operations_per_second = self._get_operations_per_second(operations) if operations_per_second: seconds_per_operation = 1 / operations_per_second weighted_average_seconds = total_operations / operations_per_second @@ -215,14 +254,19 @@ async def _run_tests(self, title: str, duration: int) -> None: self.logger.info("Completed without generating operation statistics.") self.logger.info("") - def _print_status(self, title): + def _print_status(self): + """Print the ongoing status as reported by all child processes""" if self._operation_status_tracker == -1: self._operation_status_tracker = 0 - self.logger.info("=== {} ===\nCurrent\t\tTotal\t\tAverage".format(title)) + self.logger.info("Current\t\tTotal\t\tAverage") - total_operations = self._get_completed_operations() + operations = [] + while not self.status.empty(): + operations.append(self.status.get()) + self.status.task_done() + total_operations = self._get_completed_operations(operations) current_operations = total_operations - self._operation_status_tracker - average_operations = self._get_operations_per_second() + average_operations = self._get_operations_per_second(operations) self._operation_status_tracker = total_operations self.logger.info("{}\t\t{}\t\t{:.2f}".format(current_operations, total_operations, average_operations)) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_repeated_timer.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_repeated_timer.py index 892eabf11c0f..63a8af99e415 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_repeated_timer.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_repeated_timer.py @@ -8,14 +8,15 @@ # Credit to https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds class RepeatedTimer(object): - def __init__(self, interval, function, *args, **kwargs): + def __init__(self, interval, function, *args, start_now: bool = True, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False - self.start() + if start_now: + self.start() def _run(self): self.is_running = False From db1d80ebaf84fb461a7cffaeca7791de96e31568 Mon Sep 17 00:00:00 2001 From: antisch Date: Thu, 10 Feb 2022 16:08:33 +1300 Subject: [PATCH 02/14] Improve barrier reliability --- .../perfstress_tests/_perf_stress_proc.py | 20 +++++++++++-------- .../perfstress_tests/_perf_stress_runner.py | 20 +++++++++++-------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py index 7148ca6057cb..933acda37cfe 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py @@ -4,6 +4,7 @@ # -------------------------------------------------------------------------------------------- import asyncio +import time from typing import List, Optional import multiprocessing import threading @@ -48,30 +49,31 @@ async def _start_tests(index, test_class, num_tests, args, do_setup, test_stages # Waiting till all procs are ready to start "Setup". This allows one child # process to setup any global resources before the rest of setup is run. - _synchronize(test_stages) + _synchronize(test_stages["Setup"]) await asyncio.gather(*[test.setup() for test in tests]) # Waiting till all procs are ready to start "Post Setup" - _synchronize(test_stages) + _synchronize(test_stages["Post Setup"]) await asyncio.gather(*[test.post_setup() for test in tests]) if args.warmup and not args.profile: # Waiting till all procs are ready to start "Warmup" - _synchronize(test_stages) + _synchronize(test_stages["Warmup"]) await _run_tests(args.warmup, args, tests, results, status) # Waiting till all procs are ready to start "Tests" - _synchronize(test_stages) + _synchronize(test_stages["Tests"]) await _run_tests(args.duration, args, tests, results, status) # Waiting till all procs have finished tests, ready to start "Pre Cleaup" - _synchronize(test_stages) + _synchronize(test_stages["Pre Cleanup"]) except threading.BrokenBarrierError: # A separate process has failed, so all of them are shutting down. print("Another test process has aborted - shutting down.") except Exception as e: print("Test processes failed. Aborting.\n{}".format(e)) - test_stages.abort() + for barrier in test_stages.values(): + barrier.abort() finally: try: # We'll attempt to clean up the tests using the barrier. @@ -82,11 +84,11 @@ async def _start_tests(index, test_class, num_tests, args, do_setup, test_stages # Waiting till all procs are ready to start "Cleanup" # If any process has failed earlier, the barrier will be broken - so wait # if we can but otherwise attempt to clean up anyway. - _synchronize(test_stages, ignore_error=True) + _synchronize(test_stages["Cleanup"], ignore_error=True) await asyncio.gather(*[test.cleanup() for test in tests]) # Waiting till all processes have completed the cleanup stages. - _synchronize(test_stages, ignore_error=True) + _synchronize(test_stages["Finished"], ignore_error=True) if do_setup and not args.no_cleanup: await tests[0].global_cleanup() except Exception as e: @@ -134,6 +136,8 @@ def _report_status(status: multiprocessing.JoinableQueue, tests: List[_PerfTestA by the parent processes. This should implicitly synchronize the status reporting across all child processes and the parent will dictate the frequence by which status is gathered. """ + # Delay the start a tiny bit to let the tests reset their status after warmup + time.sleep(1) while not stop.is_set(): for test in tests: status.put((test._parallel_index, test.completed_operations, test.last_completion_time)) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py index cd37842249bb..29f237251657 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -140,10 +140,7 @@ def _discover_tests(self, test_folder_path): def _next_stage(self, title: str, track_status: bool = False, report_results: bool = False): # Wait for previous stage to complete. - self.test_stages.wait() - - # Reset barrier to start next stage. - self.test_stages.reset() + self.test_stages[title].wait() # Stop any status tracking of the previous stage. if self.status_thread.is_running: @@ -154,8 +151,7 @@ def _next_stage(self, title: str, track_status: bool = False, report_results: bo self._report_results() self.logger.info("") - if title: - self.logger.info("=== {} ===".format(title)) + self.logger.info("=== {} ===".format(title)) # If next stage status should be tracked, restart tracker. if track_status: @@ -181,7 +177,15 @@ async def start(self): # The barrier will synchronize each child proc with the parent at each stage of the # the testing run. This prevents one proc from running tests while global resources # are still being configured or cleaned up. - self.test_stages = multiprocessing.Barrier(processes + 1) + self.test_stages = { + "Setup": multiprocessing.Barrier(processes + 1), + "Post Setup": multiprocessing.Barrier(processes + 1), + "Warmup": multiprocessing.Barrier(processes + 1), + "Tests": multiprocessing.Barrier(processes + 1), + "Pre Cleanup": multiprocessing.Barrier(processes + 1), + "Cleanup": multiprocessing.Barrier(processes + 1), + "Finished": multiprocessing.Barrier(processes + 1) + } try: futures = [multiprocessing.Process( @@ -223,7 +227,7 @@ async def start(self): # Wait till all tests have finished cleaning up, this allows one proc to start # the "GlobalCleanup" which may start pulling down resources. - self._next_stage(None) + self._next_stage("Finished") # Close all procs. [f.join() for f in futures] From 2741c695533d8f37285d61fb3279e0e90938d1ad Mon Sep 17 00:00:00 2001 From: antisch Date: Thu, 10 Feb 2022 16:11:42 +1300 Subject: [PATCH 03/14] Spelling --- .../perfstress_tests/_perf_stress_proc.py | 14 +++++++------- .../perfstress_tests/_perf_stress_runner.py | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py index 933acda37cfe..38740cc46d75 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py @@ -47,25 +47,25 @@ async def _start_tests(index, test_class, num_tests, args, do_setup, test_stages if do_setup: await tests[0].global_setup() - # Waiting till all procs are ready to start "Setup". This allows one child + # Waiting till all processes are ready to start "Setup". This allows one child # process to setup any global resources before the rest of setup is run. _synchronize(test_stages["Setup"]) await asyncio.gather(*[test.setup() for test in tests]) - # Waiting till all procs are ready to start "Post Setup" + # Waiting till all processes are ready to start "Post Setup" _synchronize(test_stages["Post Setup"]) await asyncio.gather(*[test.post_setup() for test in tests]) if args.warmup and not args.profile: - # Waiting till all procs are ready to start "Warmup" + # Waiting till all processes are ready to start "Warmup" _synchronize(test_stages["Warmup"]) await _run_tests(args.warmup, args, tests, results, status) - # Waiting till all procs are ready to start "Tests" + # Waiting till all processes are ready to start "Tests" _synchronize(test_stages["Tests"]) await _run_tests(args.duration, args, tests, results, status) - # Waiting till all procs have finished tests, ready to start "Pre Cleaup" + # Waiting till all processes have finished tests, ready to start "Pre Cleanup" _synchronize(test_stages["Pre Cleanup"]) except threading.BrokenBarrierError: # A separate process has failed, so all of them are shutting down. @@ -81,7 +81,7 @@ async def _start_tests(index, test_class, num_tests, args, do_setup, test_stages # If one process has failed, we'll still attempt to clean up without the barrier. await asyncio.gather(*[test.pre_cleanup() for test in tests]) if not args.no_cleanup: - # Waiting till all procs are ready to start "Cleanup" + # Waiting till all processes are ready to start "Cleanup" # If any process has failed earlier, the barrier will be broken - so wait # if we can but otherwise attempt to clean up anyway. _synchronize(test_stages["Cleanup"], ignore_error=True) @@ -134,7 +134,7 @@ def _report_status(status: multiprocessing.JoinableQueue, tests: List[_PerfTestA This is achieved by adding status to a joinable queue then waiting for that queue to be cleared by the parent processes. This should implicitly synchronize the status reporting across all child - processes and the parent will dictate the frequence by which status is gathered. + processes and the parent will dictate the frequency by which status is gathered. """ # Delay the start a tiny bit to let the tests reset their status after warmup time.sleep(1) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py index 29f237251657..31dc5828286b 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -218,7 +218,7 @@ async def start(self): # Wait will all tests have completed setup and warmup before beginning "Tests". self._next_stage("Tests", track_status=True, report_results=True) - # Wait till all tests have completed before benning cleanup and shutdown. + # Wait till all tests have completed before beginning cleanup and shutdown. self._next_stage("Pre Cleanup", report_results=True) # If cleanup is configured, wait till all tests are ready to begin "Cleanup" @@ -229,7 +229,7 @@ async def start(self): # the "GlobalCleanup" which may start pulling down resources. self._next_stage("Finished") - # Close all procs. + # Close all processes. [f.join() for f in futures] except threading.BrokenBarrierError: From c5fb13074c787c0c827fd9962f6930ca638869a0 Mon Sep 17 00:00:00 2001 From: antisch Date: Mon, 21 Mar 2022 14:12:10 +1300 Subject: [PATCH 04/14] Run global setup per proc --- .../perfstress_tests/_perf_stress_proc.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py index 38740cc46d75..6204b0d37767 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py @@ -20,7 +20,7 @@ def run_process(index, args, module, test_name, num_tests, test_stages, results, """ test_module = module[0].load_module(module[1]) test_class = getattr(test_module, test_name) - value = asyncio.run(_start_tests(index, test_class, num_tests, args, index == 0, test_stages, results, status)) + value = asyncio.run(_start_tests(index, test_class, num_tests, args, test_stages, results, status)) return value @@ -36,18 +36,17 @@ def _synchronize(stages, ignore_error=False): if not ignore_error: raise -async def _start_tests(index, test_class, num_tests, args, do_setup, test_stages, results, status): +async def _start_tests(index, test_class, num_tests, args, test_stages, results, status): """Create test classes, run setup, tests and cleanup.""" # Create all parallel tests with a global unique index value with _PerfTestBase._global_parallel_index_lock: _PerfTestBase._global_parallel_index = index tests = [test_class(args) for _ in range(num_tests)] try: - # Only the child process with index=0 will run the global setup. - if do_setup: - await tests[0].global_setup() + # Run the global setup once per process. + await tests[0].global_setup() - # Waiting till all processes are ready to start "Setup". This allows one child + # Waiting till all processes are ready to start "Setup". This allows each child # process to setup any global resources before the rest of setup is run. _synchronize(test_stages["Setup"]) await asyncio.gather(*[test.setup() for test in tests]) @@ -89,7 +88,8 @@ async def _start_tests(index, test_class, num_tests, args, do_setup, test_stages # Waiting till all processes have completed the cleanup stages. _synchronize(test_stages["Finished"], ignore_error=True) - if do_setup and not args.no_cleanup: + if not args.no_cleanup: + # Run global cleanup once per process. await tests[0].global_cleanup() except Exception as e: # Tests were unable to clean up, maybe due to earlier failure state. From 44ca895f2867fbde71f3e1e43a1ccf7a54c853e4 Mon Sep 17 00:00:00 2001 From: antisch Date: Mon, 21 Mar 2022 15:49:41 +1300 Subject: [PATCH 05/14] Update cspell --- .vscode/cspell.json | 1 + 1 file changed, 1 insertion(+) diff --git a/.vscode/cspell.json b/.vscode/cspell.json index 3e22b3c2229e..26aa60b7c83a 100644 --- a/.vscode/cspell.json +++ b/.vscode/cspell.json @@ -176,6 +176,7 @@ "ipconfiguration", "ipconfigurations", "iqmp", + "isclass", "iscoroutine", "iscsi", "ivar", From 70e256396525400c40f4ccb9f10933ac7161d538 Mon Sep 17 00:00:00 2001 From: Mike Harder Date: Tue, 26 Apr 2022 11:46:36 -0700 Subject: [PATCH 06/14] Add parameters to SleepTest - Aligns with .NET and supports more scenarios --- .../system_perfstress/sleep_test.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py index 84a171c47e93..ff5ee6afb5c4 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py @@ -12,15 +12,29 @@ # Used for verifying the perf framework correctly computes average throughput across parallel tests of different speed class SleepTest(PerfStressTest): - instance_count = 0 def __init__(self, arguments): super().__init__(arguments) - type(self).instance_count += 1 - self.seconds_per_operation = math.pow(2, type(self).instance_count) + self.seconds_per_operation = (self.args.initial_delay_ms / 1000) * math.pow(self.args.instance_growth_factor, self._parallel_index) def run_sync(self): time.sleep(self.seconds_per_operation) + self.seconds_per_operation *= self.args.iteration_growth_factor async def run_async(self): await asyncio.sleep(self.seconds_per_operation) + self.seconds_per_operation *= self.args.iteration_growth_factor + + @staticmethod + def add_arguments(parser): + super(SleepTest, SleepTest).add_arguments(parser) + parser.add_argument('--initial-delay-ms', nargs='?', type=int, default=1000, help='Initial delay (in milliseconds)') + + # Used for verifying the perf framework correctly computes average throughput across parallel tests of different speed. + # Each instance of this test completes operations at a different rate, to allow for testing scenarios where + # some instances are still waiting when time expires. + parser.add_argument('--instance-growth-factor', nargs='?', type=float, default=1, + help='Instance growth factor. The delay of instance N will be (InitialDelayMS * (InstanceGrowthFactor ^ InstanceCount)).') + + parser.add_argument('--iteration-growth-factor', nargs='?', type=float, default=1, + help='Iteration growth factor. The delay of iteration N will be (InitialDelayMS * (IterationGrowthFactor ^ IterationCount)).') From 073f2821f4994edc6703ff4a2a6aed062ba8766a Mon Sep 17 00:00:00 2001 From: Mike Harder Date: Tue, 26 Apr 2022 11:47:12 -0700 Subject: [PATCH 07/14] Add LogTest to help observe and debug the framework --- .../system_perfstress/log_test.py | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/log_test.py diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/log_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/log_test.py new file mode 100644 index 000000000000..b72d09791e9c --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/log_test.py @@ -0,0 +1,51 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os +import time +import asyncio + +from azure_devtools.perfstress_tests import PerfStressTest + +# Used for logging every step and property of the perf test +class LogTest(PerfStressTest): + _logged_global_completed_operations = 0 + + start_time = time.time() + + def __init__(self, arguments): + super().__init__(arguments) + self._seconds_per_operation = 1.0 / (self._parallel_index + 1) + self._logged_completed_operations = 0 + self.log("__init__()") + + async def global_setup(self): + await super().global_setup() + self.log("global_setup()") + + async def setup(self): + await super().setup() + self.log("setup()") + + def run_sync(self): + time.sleep(self._seconds_per_operation) + self._logged_completed_operations += 1 + type(self)._logged_global_completed_operations += 1 + + async def run_async(self): + await asyncio.sleep(self._seconds_per_operation) + self._logged_completed_operations += 1 + type(self)._logged_global_completed_operations += 1 + + async def cleanup(self): + await super().cleanup() + self.log(f'cleanup() - Completed Operations: {self._logged_completed_operations}') + + async def global_cleanup(self): + await super().global_cleanup() + self.log(f'global_cleanup() - Global Completed Operations: {self._logged_global_completed_operations}') + + def log(self, message): + print(f'[{(time.time() - type(self).start_time):.3f}] [PID: {os.getpid()}] [Parallel: {self._parallel_index}] {message}') From 259d7e28ffece8f29f7e62ef316f40f96019b9ed Mon Sep 17 00:00:00 2001 From: antisch Date: Tue, 31 May 2022 14:07:47 +1200 Subject: [PATCH 08/14] Subprocess spawn instead of fork --- .../perfstress_tests/_perf_stress_proc.py | 14 +++++---- .../perfstress_tests/_perf_stress_runner.py | 30 ++++++++++--------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py index 6204b0d37767..bc761fc230ef 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py @@ -9,6 +9,7 @@ import multiprocessing import threading from concurrent.futures import ThreadPoolExecutor, as_completed +import importlib from ._perf_stress_base import _PerfTestABC, _PerfTestBase @@ -18,8 +19,8 @@ def run_process(index, args, module, test_name, num_tests, test_stages, results, Here we load the test class from the correct module and start it. """ - test_module = module[0].load_module(module[1]) - test_class = getattr(test_module, test_name) + test_module = importlib.import_module(module) + test_class = getattr(test_module, test_name) value = asyncio.run(_start_tests(index, test_class, num_tests, args, test_stages, results, status)) return value @@ -39,10 +40,13 @@ def _synchronize(stages, ignore_error=False): async def _start_tests(index, test_class, num_tests, args, test_stages, results, status): """Create test classes, run setup, tests and cleanup.""" # Create all parallel tests with a global unique index value - with _PerfTestBase._global_parallel_index_lock: - _PerfTestBase._global_parallel_index = index - tests = [test_class(args) for _ in range(num_tests)] + tests = [] + try: + with _PerfTestBase._global_parallel_index_lock: + _PerfTestBase._global_parallel_index = index + tests = [test_class(args) for _ in range(num_tests)] + # Run the global setup once per process. await tests[0].global_setup() diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py index 31dc5828286b..ad55104aaa42 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -8,6 +8,7 @@ import logging import os import pkgutil +import importlib import sys from typing import List, Optional, Tuple import multiprocessing @@ -120,13 +121,13 @@ def _discover_tests(self, test_folder_path): self._test_classes = {} if os.path.isdir(os.path.join(test_folder_path, 'tests')): test_folder_path = os.path.join(test_folder_path, 'tests') + sys.path.append(test_folder_path) self.logger.debug("Searching for tests in {}".format(test_folder_path)) # Dynamically enumerate all python modules under the tests path for classes that implement PerfStressTest - for loader, module_name, _ in pkgutil.walk_packages([test_folder_path]): + for _, module_name, _ in pkgutil.walk_packages([test_folder_path]): try: - module_loader = loader.find_module(module_name) - module = module_loader.load_module(module_name) + module = importlib.import_module(module_name) except Exception as e: self.logger.debug("Unable to load module {}: {}".format(module_name, e)) continue @@ -136,7 +137,7 @@ def _discover_tests(self, test_folder_path): if inspect.isclass(value): if issubclass(value, _PerfTestABC) and value not in base_classes: self.logger.info("Loaded test class: {}".format(name)) - self._test_classes[name] = (value, (module_loader, module_name)) + self._test_classes[name] = (value, module_name) def _next_stage(self, title: str, track_status: bool = False, report_results: bool = False): # Wait for previous stage to complete. @@ -170,25 +171,26 @@ async def start(self): k, m = divmod(self.per_test_args.parallel, processes) mapping = [(i*k+min(i, m), ((i+1)*k+min(i+1, m)) - (i*k+min(i, m))) for i in range(processes)] - self.results = multiprocessing.Queue() - self.status = multiprocessing.JoinableQueue() + ctx = multiprocessing.get_context('spawn') + self.results = ctx.Queue() + self.status = ctx.JoinableQueue() self.status_thread = RepeatedTimer(1, self._print_status, start_now=False) # The barrier will synchronize each child proc with the parent at each stage of the # the testing run. This prevents one proc from running tests while global resources # are still being configured or cleaned up. self.test_stages = { - "Setup": multiprocessing.Barrier(processes + 1), - "Post Setup": multiprocessing.Barrier(processes + 1), - "Warmup": multiprocessing.Barrier(processes + 1), - "Tests": multiprocessing.Barrier(processes + 1), - "Pre Cleanup": multiprocessing.Barrier(processes + 1), - "Cleanup": multiprocessing.Barrier(processes + 1), - "Finished": multiprocessing.Barrier(processes + 1) + "Setup": ctx.Barrier(processes + 1), + "Post Setup": ctx.Barrier(processes + 1), + "Warmup": ctx.Barrier(processes + 1), + "Tests": ctx.Barrier(processes + 1), + "Pre Cleanup": ctx.Barrier(processes + 1), + "Cleanup": ctx.Barrier(processes + 1), + "Finished": ctx.Barrier(processes + 1) } try: - futures = [multiprocessing.Process( + futures = [ctx.Process( target=run_process, args=( index, From da330624c93f0c10f1b7a052c7d50667bfcb1ce2 Mon Sep 17 00:00:00 2001 From: antisch Date: Tue, 31 May 2022 16:05:06 +1200 Subject: [PATCH 09/14] Fix status reporting --- .../perfstress_tests/_perf_stress_runner.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py index ad55104aaa42..60aa727f5917 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -174,7 +174,7 @@ async def start(self): ctx = multiprocessing.get_context('spawn') self.results = ctx.Queue() self.status = ctx.JoinableQueue() - self.status_thread = RepeatedTimer(1, self._print_status, start_now=False) + self.status_thread = RepeatedTimer(1, self._print_status, self.per_test_args.parallel, start_now=False) # The barrier will synchronize each child proc with the parent at each stage of the # the testing run. This prevents one proc from running tests while global resources @@ -260,19 +260,24 @@ def _report_results(self): self.logger.info("Completed without generating operation statistics.") self.logger.info("") - def _print_status(self): + def _print_status(self, num_tests): """Print the ongoing status as reported by all child processes""" if self._operation_status_tracker == -1: self._operation_status_tracker = 0 self.logger.info("Current\t\tTotal\t\tAverage") - operations = [] + operations = [None] * num_tests while not self.status.empty(): - operations.append(self.status.get()) - self.status.task_done() - total_operations = self._get_completed_operations(operations) - current_operations = total_operations - self._operation_status_tracker - average_operations = self._get_operations_per_second(operations) + test_status = self.status.get() + operations[test_status[0]] = test_status + if all(operations): + # We only print status stats when all processes have reported + # otherwise we can't correctly calculate the current operations. + total_operations = self._get_completed_operations(operations) + current_operations = total_operations - self._operation_status_tracker + average_operations = self._get_operations_per_second(operations) + + self._operation_status_tracker = total_operations + self.logger.info("{}\t\t{}\t\t{:.2f}".format(current_operations, total_operations, average_operations)) - self._operation_status_tracker = total_operations - self.logger.info("{}\t\t{}\t\t{:.2f}".format(current_operations, total_operations, average_operations)) + self.status.task_done() From 4bb900ef81fe4cb75aef78e1b60d1410d4b66f2c Mon Sep 17 00:00:00 2001 From: antisch Date: Wed, 14 Dec 2022 22:49:08 +1300 Subject: [PATCH 10/14] Improved error handling --- .../perfstress_tests/_perf_stress_base.py | 1 - .../perfstress_tests/_perf_stress_proc.py | 31 ++++++++++++------- .../perfstress_tests/_perf_stress_runner.py | 11 +++++-- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py index de211c578a7c..c31b3a968bc5 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py @@ -5,7 +5,6 @@ import os import abc -import threading import multiprocessing import argparse diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py index bc761fc230ef..f51ee675e98e 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_proc.py @@ -5,10 +5,11 @@ import asyncio import time -from typing import List, Optional +import queue +from typing import List import multiprocessing import threading -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED import importlib from ._perf_stress_base import _PerfTestABC, _PerfTestBase @@ -74,7 +75,7 @@ async def _start_tests(index, test_class, num_tests, args, test_stages, results, # A separate process has failed, so all of them are shutting down. print("Another test process has aborted - shutting down.") except Exception as e: - print("Test processes failed. Aborting.\n{}".format(e)) + print(f"Test processes failed - aborting. Error: {e}") for barrier in test_stages.values(): barrier.abort() finally: @@ -97,10 +98,13 @@ async def _start_tests(index, test_class, num_tests, args, test_stages, results, await tests[0].global_cleanup() except Exception as e: # Tests were unable to clean up, maybe due to earlier failure state. - print("Failed to cleanup up tests: {}".format(e)) + print(f"Failed to cleanup up tests: {e}") finally: # Always call close on the tests, even if cleanup failed. - await asyncio.gather(*[test.close() for test in tests]) + try: + await asyncio.gather(*[test.close() for test in tests]) + except Exception as e: + print(f"Failed to close tests: {e}") async def _run_tests(duration: int, args, tests, results, status) -> None: @@ -116,13 +120,18 @@ async def _run_tests(duration: int, args, tests, results, status) -> None: try: if args.sync: with ThreadPoolExecutor(max_workers=args.parallel) as ex: - futures = [ex.submit(test.run_all_sync, duration) for test in tests] - for future in as_completed(futures): - future.result() - + tasks = [ex.submit(test.run_all_sync, duration) for test in tests] + wait(tasks, return_when=ALL_COMPLETED) else: - tasks = [test.run_all_async(duration) for test in tests] - await asyncio.gather(*tasks) + tasks = [asyncio.create_task(test.run_all_async(duration)) for test in tests] + await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + + # If any of the parallel test runs raised an exception, let it be propagated, after all tasks have + # completed. + # TODO: This will only raise the first Exception encountered. Once we migrate the perf pipelines + # to 3.11 we could refactor to use ExceptionGroups so all exceptions will be captured. + for task in tasks: + task.result() # Add final test results to the results queue to be accumulated by the parent process. for test in tests: diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py index 1e52a8129492..09c35469a9d6 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -232,13 +232,18 @@ async def start(self): # the "GlobalCleanup" which may start pulling down resources. self._next_stage("Finished") - # Close all processes. - [f.join() for f in futures] - except threading.BrokenBarrierError: self.logger.warn("Exception: One or more test processes failed and exited.") except Exception as e: self.logger.warn("Exception: " + str(e)) + finally: + # Close all processes. + try: + [f.join() for f in futures] + self.status_thread.stop() + except Exception as e: + self.logger.warn("Error closing processes: " + str(e)) + def _report_results(self): """Calculate and log the test run results across all child processes""" From a0044119f744805bec8c9729aa01b4532438f75b Mon Sep 17 00:00:00 2001 From: antisch Date: Fri, 16 Dec 2022 13:30:36 +1300 Subject: [PATCH 11/14] Updated docs --- doc/dev/perfstress_tests.md | 165 +++++++++++++----- .../perfstress_tests/_perf_stress_base.py | 4 +- 2 files changed, 120 insertions(+), 49 deletions(-) diff --git a/doc/dev/perfstress_tests.md b/doc/dev/perfstress_tests.md index e8fb7d40cc08..a2125075f74d 100644 --- a/doc/dev/perfstress_tests.md +++ b/doc/dev/perfstress_tests.md @@ -1,9 +1,11 @@ # Table of Contents 1. [The perfstress framework](#the-perfstress-framework) - - [The PerfStressTest base](#the-perfstresstest-base) + - [The framework baseclasses](#the-provided-baseclasses) + - [PerfStressTest](#the-perfstresstest-baseclass) + - [BatchPerfTest](#the-batchperftest-baseclass) + - [EventPerfTest](#the-eventperftest-baseclass) - [Default command options](#default-command-options) - [Running with test proxy](#running-with-the-test-proxy) - - [The BatchPerfTest base](#the-batchperftest-base) 2. [Adding performance tests to an SDK](#adding-performance-tests-to-an-sdk) - [Writing a test](#writing-a-test) - [Writing a batch test](#writing-a-batch-test) @@ -22,52 +24,69 @@ the tests. To start using the framework, make sure that `azure-devtools` is incl ``` The perfstress framework offers the following: - The `perfstress` commandline tool. -- The `PerfStressTest` baseclass. -- The `BatchPerfTest` baseclass. +- The `PerfStressTest` baseclass (each test run counted as a single data point). +- The `BatchPerfTest` baseclass (each test run counted as 1 or more data points). +- The `EventPerfTest` baseclass (supports a callback based event handler). - Stream utilities for uploading/downloading without storing in memory: `RandomStream`, `AsyncRandomStream`, `WriteStream`. - A `get_random_bytes` utility for returning randomly generated data. - A series of "system tests" to test the perfstress framework along with the performance of the raw transport layers (requests, aiohttp, etc). -## The PerfStressTest base -The `PerfStressTest` base class is what will be used for all perf test implementations. It provides the following API: +## The provided baseclasses: +The perf framework provides three baseclasses to accommodate testing different SDK scenarios. Depending on which baseclass you select, different +methods will need to be implemented. All of them have a common base API (`_PerfTestBase`), defined below: + ```python -class PerfStressTest: +class _PerfTestBase: args = {} # Command line arguments + @property + def completed_operations(self) -> int: + # Total number of operations completed by run_all(). Reset after warmup. + + @property + def last_completion_time(self) -> float: + # Elapsed time between start of warmup/run and last completed operation. Reset after warmup. + def __init__(self, arguments): # The command line args can be accessed on construction. - async def global_setup(self): - # Can be optionally defined. Only run once, regardless of parallelism. + async def global_setup(self) -> None: + # Can be optionally defined. Only run once per process, regardless of multi-threading. + # The baseclasses will also define logic here, so if you override this method, make sure you include a call to super(). - async def global_cleanup(self): - # Can be optionally defined. Only run once, regardless of parallelism. + async def global_cleanup(self) -> None: + # Can be optionally defined. Only run once per process, regardless of multi-threading. + # The baseclasses will also define logic here, so if you override this method, make sure you include a call to super(). - async def post_setup(self): + async def post_setup(self) -> None: # Post-setup called once per parallel test instance. # Used by base classes to setup state (like test-proxy) after all derived class setup is complete. # There should be no need to overwrite this function. - async def pre_cleanup(self): + async def pre_cleanup(self) -> None: # Pre-cleanup called once per parallel test instance. # Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs. # There should be no need to overwrite this function. - async def setup(self): + async def setup(self) -> None: # Can be optionally defined. Run once per test instance, after global_setup. + # The baseclasses will also define logic here, so if you override this method, make sure you include a call to super(). - async def cleanup(self): + async def cleanup(self) -> None: # Can be optionally defined. Run once per test instance, before global_cleanup. + # The baseclasses will also define logic here, so if you override this method, make sure you include a call to super(). - async def close(self): + async def close(self) -> None: # Can be optionally defined. Run once per test instance, after cleanup and global_cleanup. + # The baseclasses will also define logic here, so if you override this method, make sure you include a call to super(). - def run_sync(self): - # Must be implemented. This will be the perf test to be run synchronously. + def run_all_sync(self, duration: int) -> None: + # Run all sync tests, including both warmup and duration. This method is implemented by the provided base + # classes, there should be no need to overwrite this function. - async def run_async(self): - # Must be implemented. This will be the perf test to be run asynchronously. - # If writing a test for a T1 legacy SDK with no async, implement this method and raise an exception. + async def run_all_async(self, duration: int) -> None: + # Run all async tests, including both warmup and duration. This method is implemented by the provided base + # classes, there should be no need to overwrite this function. @staticmethod def add_arguments(parser): @@ -78,17 +97,92 @@ class PerfStressTest: def get_from_env(variable): # Get the value of an env var. If empty or not found, a ValueError will be raised. ``` +### The PerfStressTest baseclass +This is probably the most common test baseclass, and should be used where each test run represents 1 logical successful result. +For example, 1 successful service request, 1 file uploaded, 1 output downloaded, etc. +Along with the above base API, the following methods will need to be implemented: + +```python +class PerfStressTest: + def run_sync(self) -> None: + # Must be implemented. This will be the perf test to be run synchronously. + + async def run_async(self) -> None: + # Must be implemented. This will be the perf test to be run asynchronously. + # If writing a test for an SDK without async support (e.g. a T1 legacy SDK), implement this method and raise an exception. + +``` +### The BatchPerfTest baseclass +The `BatchPerfTest` class is the parent class of the above `PerfStressTest` class that is further abstracted to allow for more flexible testing of SDKs that don't conform to a 1:1 ratio of operations to results. +This baseclass should be used where each test run represent a more than a single result. For example, results that are uploaded +or downloaded in batches. +Along with the above base API, the following methods will need to be implemented: + +```python +class BatchPerfTest: + def run_batch_sync(self) -> int: + # Run cumultive operation(s) synchronously - i.e. an operation that results in more than a single logical result. + # When inheriting from BatchPerfTest, this method will need to be implemented. + # Must return the number of completed results represented by a single successful test run. + + async def run_batch_async(self) -> int: + # Run cumultive operation(s) asynchronously - i.e. an operation that results in more than a single logical result. + # When inheriting from BatchPerfTest, this method will need to be implemented. + # Must return the number of completed results represented by a single successful test run. + # If writing a test for an SDK without async support (e.g. a T1 legacy SDK), implement this method and raise an exception. + +``` +### The EventPerfTest baseclass +This baseclass should be used when SDK operation to be tested requires starting up a process that acts on events via callback. +Along with the above base API, the following methods will need to be implemented: +```python +class EventPerfTest: + def event_raised_sync(self) -> None: + # This method should not be overwritten, instead it should be called in your sync callback implementation + # to register a single successful event. + + def error_raised_sync(self, error): + # This method should not be overwritten, instead it should be called in your sync callback implementation + # to register a failure in the event handler. This will result in the test being shutdown. + + async def event_raised_async(self): + # This method should not be overwritten, instead it should be called in your async callback implementation + # to register a single successful event. + + async def error_raised_async(self, error): + # This method should not be overwritten, instead it should be called in your async callback implementation + # to register a failure in the event handler. This will result in the test being shutdown. + + def start_events_sync(self) -> None: + # Must be implemented - starts the asynchronous process for receiving events. + # This can be blocking for the duration of the test as it will be run during setup() in a thread. + + def stop_events_sync(self) -> None: + # Stop the sychronous process for receiving events. Must be implemented. Will be called during cleanup. + + async def start_events_async(self) -> None: + # Must be implemented - starts the synchronous process for receiving events. + # This can be blocking for the duration of the test as it will be scheduled in the eventloop during setup(). + + async def stop_events_async(self) -> None: + # Stop the asychronous process for receiving events. Must be implemented. Will be called during cleanup. + +``` + ## Default command options The framework has a series of common command line options built in: - `-d --duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. -- `-i --iterations=1` Number of test iterations to run. Default is 1. - `-p --parallel=1` Number of tests to run in parallel. Default is 1. +- `--processes=multiprocessing.cpu_count()` Number of concurrent processes that the parallel test runs should be distributed over. This is used + together with `--parallel` to distribute the number of concurrent tests first between available processes, then between threads within each + process. For example if `--parallel=16 --processes=4`, 4 processes will be started, each running 4 concurrent threaded test instances. + Best effort will be made to distribute evenly, for example if `--parallel=10 --processes=4`, 4 processes will be start, two of which run 3 threads, and two that run 2 threads. It's therefore recommended that the value of `parallel` be less than, or a multiple of, the value of `processes`. - `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. - `--sync` Whether to run the tests in sync or async. Default is False (async). - `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). - `--insecure` Whether to run without SSL validation. Default is False. - `-x --test-proxies` Whether to run the tests against the test proxy server. Specify the URL(s) for the proxy endpoint(s) (e.g. "https://localhost:5001"). Multiple values should be semi-colon-separated. -- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of a single iteration will be written to the current working directory in the format `"cProfile---.pstats"`. +- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of a single iteration will be written to the current working directory in the format `"cProfile---.pstats"`. **Note:** The profiler is not currently supported for the `EventPerfTest` baseclass. ## Running with the test proxy Follow the instructions here to install and run the test proxy server: @@ -98,29 +192,6 @@ Once running, in a separate process run the perf test in question, combined with ```cmd (env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001" ``` -## The BatchPerfTest base -The `BatchPerfTest` class is the parent class of the above `PerfStressTest` class that is further abstracted to allow for more flexible testing of SDKs that don't conform to a 1:1 ratio of operations to results. -An example of this is a messaging SDK that streams multiple messages for a period of time. -This base class uses the same setup/cleanup/close functions described above, however instead of `run_sync` and `run_async`, it has `run_batch_sync` and `run_batch_async`: -```python -class BatchPerfTest: - - def run_batch_sync(self) -> int: - """ - Run cumultive operation(s) - i.e. an operation that results in more than a single logical result. - :returns: The number of completed results. - :rtype: int - """ - - async def run_batch_async(self) -> int: - """ - Run cumultive operation(s) - i.e. an operation that results in more than a single logical result. - :returns: The number of completed results. - :rtype: int - """ - -``` -An example test case using the `BatchPerfTest` base can be found below. # Adding performance tests to an SDK The performance tests will be in a submodule called `perfstress_tests` within the `tests` directory in an SDK project. @@ -131,7 +202,7 @@ sdk/storage/azure-storage-blob/tests/perfstress_tests This `perfstress_tests` directory is a module, and so must contain an `__init__.py` file. This can be empty. ## Writing a test -To add a test, import and inherit from `PerfStressTest` and populate the functions as needed. +To add a test, import and inherit from one of the provided baseclasses and populate the functions as needed. The name of the class will be the name of the perf test, and is what will be passed into the command line to execute that test. ```python from azure_devtools.perfstress_tests import PerfStressTest diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py index c31b3a968bc5..3d450a83e30b 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py @@ -134,14 +134,14 @@ def last_completion_time(self) -> float: async def global_setup(self) -> None: """ - Setup called once across all parallel test instances. + Setup called once per process across all threaded test instances. Used to setup state that can be used by all test instances. """ return async def global_cleanup(self) -> None: """ - Cleanup called once across all parallel test instances. + Cleanup called once per process across all threaded test instances. Used to cleanup state that can be used by all test instances. """ return From b525ce7688cb1bec38a94f171232a7844282edef Mon Sep 17 00:00:00 2001 From: Anna Tisch Date: Fri, 16 Dec 2022 16:01:15 +1300 Subject: [PATCH 12/14] Apply suggestions from code review Co-authored-by: Paul Van Eck --- doc/dev/perfstress_tests.md | 8 ++++---- .../perfstress_tests/_perf_stress_runner.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/doc/dev/perfstress_tests.md b/doc/dev/perfstress_tests.md index a2125075f74d..d7875d8c81c7 100644 --- a/doc/dev/perfstress_tests.md +++ b/doc/dev/perfstress_tests.md @@ -121,12 +121,12 @@ Along with the above base API, the following methods will need to be implemented ```python class BatchPerfTest: def run_batch_sync(self) -> int: - # Run cumultive operation(s) synchronously - i.e. an operation that results in more than a single logical result. + # Run cumulative operation(s) synchronously - i.e. an operation that results in more than a single logical result. # When inheriting from BatchPerfTest, this method will need to be implemented. # Must return the number of completed results represented by a single successful test run. async def run_batch_async(self) -> int: - # Run cumultive operation(s) asynchronously - i.e. an operation that results in more than a single logical result. + # Run cumulative operation(s) asynchronously - i.e. an operation that results in more than a single logical result. # When inheriting from BatchPerfTest, this method will need to be implemented. # Must return the number of completed results represented by a single successful test run. # If writing a test for an SDK without async support (e.g. a T1 legacy SDK), implement this method and raise an exception. @@ -158,14 +158,14 @@ class EventPerfTest: # This can be blocking for the duration of the test as it will be run during setup() in a thread. def stop_events_sync(self) -> None: - # Stop the sychronous process for receiving events. Must be implemented. Will be called during cleanup. + # Stop the synchronous process for receiving events. Must be implemented. Will be called during cleanup. async def start_events_async(self) -> None: # Must be implemented - starts the synchronous process for receiving events. # This can be blocking for the duration of the test as it will be scheduled in the eventloop during setup(). async def stop_events_async(self) -> None: - # Stop the asychronous process for receiving events. Must be implemented. Will be called during cleanup. + # Stop the asynchronous process for receiving events. Must be implemented. Will be called during cleanup. ``` diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py index 09c35469a9d6..15c4f753124f 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -213,12 +213,12 @@ async def start(self): # All tests wait to start Post Setup. self._next_stage("Post Setup") - # If a warm up is configured, wait will all tests have finished all setup + # If a warm up is configured, wait till all tests have finished all setup # stages before beginning "Warmup". if self.per_test_args.warmup: self._next_stage("Warmup", track_status=True) - # Wait will all tests have completed setup and warmup before beginning "Tests". + # Wait till all tests have completed setup and warmup before beginning "Tests". self._next_stage("Tests", track_status=True, report_results=True) # Wait till all tests have completed before beginning cleanup and shutdown. From 0dacb82aea15168da544dc89c24daeef4d261cd5 Mon Sep 17 00:00:00 2001 From: antisch Date: Fri, 16 Dec 2022 16:06:15 +1300 Subject: [PATCH 13/14] Cspell fixes --- doc/dev/perfstress_tests.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/doc/dev/perfstress_tests.md b/doc/dev/perfstress_tests.md index d7875d8c81c7..888b48c4cf96 100644 --- a/doc/dev/perfstress_tests.md +++ b/doc/dev/perfstress_tests.md @@ -14,6 +14,8 @@ - [Running the system tests](#running-the-system-tests) 4. [Readme](#readme) +[comment]: # ( cspell:ignore perfstresstest batchperftest ) + # The perfstress framework The perfstress framework has been added to azure-devtools module. The code can be found [here](https://github.com/Azure/azure-sdk-for-python/tree/main/tools/azure-devtools/src/azure_devtools/perfstress_tests). @@ -154,14 +156,14 @@ class EventPerfTest: # to register a failure in the event handler. This will result in the test being shutdown. def start_events_sync(self) -> None: - # Must be implemented - starts the asynchronous process for receiving events. + # Must be implemented - starts the synchronous process for receiving events. # This can be blocking for the duration of the test as it will be run during setup() in a thread. def stop_events_sync(self) -> None: # Stop the synchronous process for receiving events. Must be implemented. Will be called during cleanup. async def start_events_async(self) -> None: - # Must be implemented - starts the synchronous process for receiving events. + # Must be implemented - starts the asynchronous process for receiving events. # This can be blocking for the duration of the test as it will be scheduled in the eventloop during setup(). async def stop_events_async(self) -> None: @@ -289,7 +291,7 @@ class _StorageStreamTestBase(PerfStressTest): super().__init__(arguments) # Any common attributes - self.container_name = 'streamperftests' + self.container_name = 'stream-perf-tests' # Auth configuration connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING") From dad6643ff763c4176e5b9344afd15ce55cb39f19 Mon Sep 17 00:00:00 2001 From: antisch Date: Sun, 18 Dec 2022 10:42:28 +1300 Subject: [PATCH 14/14] Typo --- doc/dev/perfstress_tests.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/dev/perfstress_tests.md b/doc/dev/perfstress_tests.md index 888b48c4cf96..d41afb8cd2d9 100644 --- a/doc/dev/perfstress_tests.md +++ b/doc/dev/perfstress_tests.md @@ -255,7 +255,7 @@ class ListContainersTest(PerfStressTest): """The synchronous perf test. Try to keep this minimal and focused. Using only a single client API. - Avoid putting any ancilliary logic (e.g. generating UUIDs), and put this in the setup/init instead + Avoid putting any ancillary logic (e.g. generating UUIDs), and put this in the setup/init instead so that we're only measuring the client API call. """ for _ in self.client.list_containers(): @@ -265,7 +265,7 @@ class ListContainersTest(PerfStressTest): """The asynchronous perf test. Try to keep this minimal and focused. Using only a single client API. - Avoid putting any ancilliary logic (e.g. generating UUIDs), and put this in the setup/init instead + Avoid putting any ancillary logic (e.g. generating UUIDs), and put this in the setup/init instead so that we're only measuring the client API call. """ async for _ in self.async_client.list_containers():