# Various stress tests to see if instance and services response adequately

## Inputs and Configuration

In [30]:
# NBVAL_IGNORE_OUTPUT

import os
import random
import requests
import time
from datetime import datetime 
from inspect import cleandoc
from dataclasses import dataclass
import threading
from threading import Thread


PAVICS_HOST = os.getenv("PAVICS_HOST", "pavics.ouranos.ca").rstrip("/")
if not PAVICS_HOST:
    raise ValueError("Cannot run test without a PAVICS_HOST value.")

PAVICS_URL = f"https://{PAVICS_HOST}"
VERIFY_SSL = True if "DISABLE_VERIFY_SSL" not in os.environ else False
MAGPIE_URL = PAVICS_URL + "/magpie"
TWITCHER_PROXY = "/twitcher/ows/proxy"
TWITCHER_URL = PAVICS_URL + TWITCHER_PROXY
TWITCHER_URL = os.getenv("TWITCHER_URL") or TWITCHER_URL

# test config
TEST_WPS_BIRDS = str(os.getenv("TEST_WPS_BIRDS", "finch,flyingpigeon,raven,hummingbird"))
TEST_WPS_BIRDS = [bird.strip() for bird in TEST_WPS_BIRDS.split(",")]
if not len(TEST_WPS_BIRDS):
    raise ValueError("Cannot run test without at least one service in TEST_WPS_BIRDS.")

TEST_MAX_AVG_TIME = int(os.getenv("TEST_MAX_AVG_TIME", 1))     # maximum allowed request seconds on average for success
TEST_MAX_ERR_CODE = int(os.getenv("TEST_MAX_ERR_CODE", 0))     # maximum allowed amount of incorrect request status code
TEST_TIMEOUT_ABORT = int(os.getenv("TEST_TIMEOUT_ABORT", 5))   # maximum timeout duration to wait before abort request
TEST_TIMEOUT_RETRY = int(os.getenv("TEST_ABORT_THRESHOLD", 3)) # maximum request timeout retries before bird is aborted

TEST_RUNS = int(os.getenv("TEST_RUNS", 100))  # number of requests per tested bird
TEST_RUNS_THREDDS = int(os.getenv("TEST_RUNS_THREDDS", 500)) # number of requests for thredds test

TEST_N_THREADS_WPS = int(os.getenv("TEST_N_THREADS_WPS", 3)) # number of threads testing in parallel each bird
TEST_N_THREADS_THREDDS = int(os.getenv("TEST_N_THREADS_THREDDS", 3)) # number of threads testing in parallel thredds


print(f"PAVICS_HOST:    [{PAVICS_HOST}]")
print(f"TWITCHER_URL:   [{TWITCHER_URL}]")
print(f"TEST_WPS_BIRDS: {TEST_WPS_BIRDS}")

PAVICS_HOST:    [pavics.ouranos.ca]
TWITCHER_URL:   [https://pavics.ouranos.ca/twitcher/ows/proxy]
TEST_WPS_BIRDS: ['finch', 'flyingpigeon', 'raven', 'hummingbird']


## Utilities

In [7]:
class Progression:
    """
    Class used by threads to keep track of the amount of request and progression of the tests. 
    Passed to each threads so that we know the progression in between threads
    """
    count: int = 0
    n_threads: int = 0
    runs_per_threads: int = 0
    total_runs: int = 0
    
    def __init__(self, n_threads:int, runs_per_threads: int):
        """
        :param n_threads: number of threads 
        :param runs_per_threads: number of stress test request
        """
        self.count = 0 
        self.n_threads = n_threads
        self.runs_per_threads = runs_per_threads
        self.total_runs = runs_per_threads * n_threads
        
        
    def __str__(self):
        return str(self.__dict__)

        
    def increase(self, amount):
        self.count = self.count + amount
        
        if threading.current_thread().name:
            thread_name = threading.current_thread().name
            print(f'    {thread_name} --> Progress : {self.count}/{self.total_runs}   ')
        else :
            print(f'    Progress : {self.count}/{self.total_runs} ')
            
@dataclass
class StressTestResult:
    code: int = 200
    runs: int = 0
    max_avg_time: float = 0
    max_err_code: int = 0
    timeout_abort: int = 0
    timeout_retry: int = 0
    timeout_count: int = 0
    method: str = "GET"
    url: str = None
    request_args: dict = None
    status: int = 0  # see description of stress-test
    codes = []
    delta = []
    times = []
    timestamps = []
    
    @property
    def avg_time(self):
        return sum(self.times) / self.runs

    @property
    def min_time(self):
        return min(self.times)

    @property
    def max_time(self):
        return max(self.times)

    @property
    def sum_err_code(self):
        return sum([code != self.code for code in self.codes])

    def __str__(self):
        columns = ["Run", "Codes", "Delta", "Times", "Timestamps"]
        idx = len(str(self.runs))
        r = max(len(columns[0]), idx)
        w = 22
        header = "".join(f"{c:>{w if i else r}}" for i, c in enumerate(columns))
        offset = 16  # spaces offset of result lines, relative to this file
        data = [f"{i+1:>{r+(offset if i else 0)}}"
                f"{('(!) ' if c != self.code else '(x) ' if self.code == 408 else '') + str(c):>{w}}"
                f"{d:>{w-1}.3f}s"
                f"{t:>{w-1}.3f}s"
                f"{ts:>{w}}"
                for i, (c, d, t, ts)
                in enumerate(zip(self.codes, self.delta, self.times, self.timestamps))]
        lines = "\n".join(data)
        summary = "Undefined failure result status condition encountered."
        if self.status == 0:
            summary = [
                "All passing conditions have been achieved.",
            ]
        elif self.status == -1:
            summary = [
                f"Detected {self.sum_err_code} erroneous HTTP codes not equal to expected {self.code}."
            ]
        elif self.status == -2:
            summary = [
                f"Detected regression with long request time.",
                f"Expected max-avg-time: ({self.max_avg_time:.3f}s <= {self.max_time:.3f}s)."
            ]
        elif self.status == -3:
            summary = [
                f"Maximum number of timeout ({self.timeout_abort}s) requests exceeded ({self.timeout_count}).",
                "Test was aborted to avoid further delays."
            ]
        summary.append(f"Test {'succeeded' if self.status == 0 else 'failed'} (status={self.status}).")
        summary = ("\n" + offset * " ").join(summary)
        return cleandoc(f"""
        Stress Test:
            Test:
                code: {self.code}
                runs: {self.runs}
                max-avg-time:  {self.max_avg_time}s
                max-err-code:  {self.max_err_code}
                sum-err-code:  {self.sum_err_code}
                timeout-abort: {self.timeout_abort}s
                timeout-retry: {self.timeout_retry}
                timeout-count: {self.timeout_count}
            Request:
                method: {self.method}
                url:    {self.url}
                args:   {self.request_args}
            Times:
                min: {self.min_time:.3f}s
                avg: {self.avg_time:.3f}s
                max: {self.max_time:.3f}s
            Results:
                {header}
                {lines}
            Summary:
                {summary}
        """)


def stress_test_requests(progression: Progression, url: str, method="GET", runs=100, code=200, delays=True,
                         max_err_code=0, max_avg_time=None,
                         abort_timeout=5, abort_retries=3, **req_kwargs) -> StressTestResult:
    """
    Executes the request for the number of demanded runs and validates the expected status is always returned.

    Outputs the results of each request and a summary of their execution time.
    If requested, also validates that all responses were returned on average faster than the maximum allowed time.

    :param url: endpoint to stress test
    :param method: HTTP method for request
    :param runs: number of stress test request
    :param code: expected HTTP code from requests
    :param delays: whether to apply small random delays between requests
       Otherwise, sequential requests are executed as quickly as possible, when the previous response is obtained.
    :param max_err_code: maximum amount of erroneous HTTP status code allowed to consider the test successful.
    :param max_avg_time: maximum average time of requests permitted to consider the test successful.
    :param abort_timeout: duration to wait until a request is aborted, sets 408 (Read Timeout) as HTTP status code.
    :param abort_retries: number of failed timeout requests allowed before abort of whole stress test for this endpoint.
    :returns:
        StressTestResult with individual request results and one of below status:
        -  0 (success) for no error and all conditions achieved
        - -1 (failure) for maximum amount of HTT error code reached
        - -2 (failure) for maximum request time on average reached
        - -3 (failure) for aborted test due to too many timeout
    """
    thread_name = threading.current_thread().name
    req_kwargs.pop("timeout", None)
    result = StressTestResult()
    result.runs = runs
    result.url = url
    result.method = method
    result.request_args = req_kwargs
    result.max_err_code = max_err_code
    result.max_avg_time = max_avg_time
    result.abort_timeout = abort_timeout
    result.abort_retries = abort_retries
    result.codes = []
    result.times = []
    result.timestamps = []
    result.delta = [0.] + [float((random.randint(1, 100) / 1000) if delays else 0) for _ in range(1, runs)]

    char = len(str(runs))
    for i in range(runs):
        start = time.perf_counter()
        try:
            resp = requests.request(method, url, timeout=abort_timeout, **req_kwargs)
        except requests.exceptions.Timeout:
            result.times.append(abort_timeout)
            result.codes.append(408)  # read timeout
            result.timeout_count += 1
        else:
            result.times.append(time.perf_counter() - start)
            result.timestamps.append(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
            result.codes.append(resp.status_code)
        if resp.status_code == 500:
            print(resp.text)
        if result.timeout_count > abort_timeout:
            result.status = -3
            print(f"Aborted: Too Many Timeout ({result.timeout_count})")
            return result
        if i == runs:
            break
        if result.delta[i]:
            time.sleep(result.delta[i])
        progress_update = 50
        if i!=0 and not (i-1)%progress_update:
            progression.increase(progress_update)
    if max_avg_time and result.avg_time > max_avg_time:
        result.status = -2
    elif len([c for c in result.codes if c == code]) >= (runs - max_err_code):
        result.status = 0
    else:
        result.status = -1
    return result


In [8]:
def run_threads(target_test, url, execution_id:int = 1, n_threads:int = 1, runs_per_threads:int = 100):
    """
    This methode launch n new thread with the target fonction to be executed.
    
    :param target_test: Fonction to be executed by the threads
    :param url: url to be called by the test
    :param execution_id: execution id to keep track of multiple execution, if this fonction is ran in a loop 
    :param n_threads: number of threads to be launched 
    :param runs_per_threads: number of stress test request per threads
    """
    
    progression = Progression(n_threads, runs_per_threads)

    line = '-'*104
    print(line)
    print(f'URL Execution-{execution_id}')
    print(f'- Starting {n_threads} threads with {runs_per_threads} runs per threads targeting "{target_test.__name__}" fonction.')
    print(f'- URL : {url}')
    print(f'- Initial progression values : {progression}')
    print(line)

    
    threads_list = [Thread(target = target_test, args=(url, progression), name=f"Thread-{i+1}") for i in range(n_threads)]
    
    for t in threads_list:
        t.start()
        
    for t in threads_list:
        t.join()
        
    
        
def stress_test(url:str, progression: Progression):
    """
    This methode launch n new thread with the target fonction to be executed.
    
    :param url: url to be requested 
    :param progression: Progression object to follow the progress of all requests
    """
    
    failed_count = 0
    failed_results = ""
    
    n_runs = progression.runs_per_threads

    expect_status_code = 200
    results = stress_test_requests(progression, url, runs=n_runs, code=expect_status_code,
                                   max_err_code=TEST_MAX_ERR_CODE, max_avg_time=TEST_MAX_AVG_TIME,
                                   abort_retries=TEST_TIMEOUT_RETRY, abort_timeout=TEST_TIMEOUT_ABORT)
    
    thread_name = threading.current_thread().name
    print(f"\n({thread_name}) Stress Test with [{n_runs}] calls \n to : [{url}]")
    print(results)
    if results.status != 0:
        failed_count += 1
        failed_results = f"{failed_results}\n{results}"
            
    assert failed_count == 0, f"Failed {failed_count} tests.  Failed results: {failed_results}"
    print(f"\n{thread_name} : All tests passed!")

## Tests

In [28]:
# # NBVAL_IGNORE_OUTPUT

# # Executing WPS test on multiple threads
# wps_urls = [f"{TWITCHER_URL}/{bird}/wps?service=wps&request=getcapabilities" for bird in TEST_WPS_BIRDS]

# for execution, url in enumerate(wps_urls):
#     run_threads(target_test = stress_test,
#                 url = url,
#                 execution_id = execution,
#                 n_threads = TEST_N_THREADS_WPS,
#                 runs_per_threads = TEST_RUNS)

In [27]:
# NBVAL_IGNORE_OUTPUT

# Executing THREDDS test on multiple threads
thredds_url = f"{TWITCHER_URL}/thredds/catalog/birdhouse/testdata/catalog.html?dataset=birdhouse/testdata/ta_Amon_MRI-CGCM3_decadal1980_r1i1p1_199101-200012.nc"

run_threads(target_test = stress_test,
            url = thredds_url,
            n_threads = TEST_N_THREADS_THREDDS,
            runs_per_threads = TEST_RUNS_THREDDS)