# How to

1. Collect benchmarks for an existing Cloud Pipeline deployment by executing all the cells below
2. Deploy Cloud Pipeline billing APIs updates
3. Rename the following files
    - report-benchmarks-2022-01-02.h5 -> report-benchmarks-2022-01-01.h5
    - bar-chart-benchmarks-2022-01-02.h5 -> bar-chart-benchmarks-2022-01-01.h5
    - plot-chart-benchmarks-2022-01-02.h5 -> plot-chart-benchmarks-2022-01-01.h5
4. Collect new benchmarks for an updated Cloud Pipeline deployment by reexecuting all the cells below
5. Study the plots

In [None]:
import functools

import time

import datetime
import json
import logging
import os
import pathlib
import subprocess
import traceback
import zipfile
import sys
import tarfile

MIN = 'min'
MAX = 'max'
MEAN = 'mean'

MONTH = 'MONTH'
QUARTER = 'QUARTER'
YEAR = 'YEAR'

RESOURCE_TYPE = 'RESOURCE_TYPE'
RUN_INSTANCE_TYPE = 'RUN_INSTANCE_TYPE'
RUN_COMPUTE_TYPE = 'RUN_COMPUTE_TYPE'
PIPELINE = 'PIPELINE'
TOOL = 'TOOL'
STORAGE = 'STORAGE'
STORAGE_TYPE = 'STORAGE_TYPE'
USER = 'USER'
BILLING_CENTER = 'BILLING_CENTER'

GENERAL = 'GENERAL'
FILE_STORAGE = 'FILE_STORAGE'
OBJECT_STORAGE = 'OBJECT_STORAGE'
COMPUTE = 'COMPUTE'
CPU_COMPUTE = 'CPU_COMPUTE'
GPU_COMPUTE = 'GPU_COMPUTE'

RUN = 'RUN'
INSTANCE = 'INSTANCE'


class BenchmarkError(RuntimeError):
    pass


def _mkdir(path):
    pathlib.Path(path).mkdir(parents=True, exist_ok=True)


def _install_python_packages(packages):
    install_command = [sys.executable, '-m', 'pip', 'install', '-q']
    install_command += list(packages)
    subprocess.check_call(install_command)


def _download_file(source_url, target_path):
    import requests
    r = requests.get(source_url, verify=False)
    with open(target_path, 'wb') as f:
        f.write(r.content)


def _extract_archive(source_path, target_path):
    if source_path.endswith('.tar.gz') or source_path.endswith('.tgz'):
        with tarfile.open(source_path, 'r:gz') as f:
            f.extractall(path=target_path)
    elif source_path.endswith('.zip'):
        with zipfile.ZipFile(source_path, 'r') as f:
            f.extractall(path=target_path)
    else:
        raise BenchmarkError(f'Unsupported archive type: {source_path}')


def _extract_parameter(name, default='', default_provider=lambda: ''):
    parameter = os.environ[name] = os.getenv(name, default) or default_provider() or default
    return parameter


def _get_previous_day_datetime_bounds(today):
    yesterday = today - datetime.timedelta(days=1)
    return (datetime.datetime.combine(date=yesterday,
                                      time=datetime.time.min,
                                      tzinfo=datetime.timezone.utc),
            datetime.datetime.combine(date=yesterday,
                                      time=datetime.time.max,
                                      tzinfo=datetime.timezone.utc))


def _get_previous_month_datetime_bounds(today):
    first_day_of_month = today.replace(day=1)
    last_day_of_previous_month = first_day_of_month - datetime.timedelta(days=1)
    first_day_of_previous_month = last_day_of_previous_month.replace(day=1)
    return (datetime.datetime.combine(date=first_day_of_previous_month,
                                      time=datetime.time.min,
                                      tzinfo=datetime.timezone.utc),
            datetime.datetime.combine(date=last_day_of_previous_month,
                                      time=datetime.time.max,
                                      tzinfo=datetime.timezone.utc))


def _get_previous_quarter_datetime_bounds(today):
    today_quarter = (today.month - 1) // 3 + 1
    if today_quarter == 1:
        previous_quarter = 4
        previous_quarter_year = today.year - 1
    else:
        previous_quarter = today_quarter - 1
        previous_quarter_year = today.year
    from calendar import monthrange
    previous_quarter_first_month = (previous_quarter - 1) * 3 + 1
    previous_quarter_last_month = previous_quarter_first_month + 2
    number_of_days_in_previous_quarter_last_month = monthrange(previous_quarter_year, previous_quarter_last_month)[1]
    first_day_of_previous_quarter = today.replace(year=previous_quarter_year, month=previous_quarter_first_month, day=1)
    last_day_of_previous_quarter = datetime.date(year=previous_quarter_year, month=previous_quarter_last_month,
                                             day=number_of_days_in_previous_quarter_last_month)
    return (datetime.datetime.combine(date=first_day_of_previous_quarter,
                                      time=datetime.time.min,
                                      tzinfo=datetime.timezone.utc),
            datetime.datetime.combine(date=last_day_of_previous_quarter,
                                      time=datetime.time.max,
                                      tzinfo=datetime.timezone.utc))


def _get_previous_year_datetime_bounds(today):
    first_day_of_year = today.replace(month=1, day=1)
    last_day_of_previous_year = first_day_of_year - datetime.timedelta(days=1)
    first_day_of_previous_year = last_day_of_previous_year.replace(month=1, day=1)
    return (datetime.datetime.combine(date=first_day_of_previous_year,
                                      time=datetime.time.min,
                                      tzinfo=datetime.timezone.utc),
            datetime.datetime.combine(date=last_day_of_previous_year,
                                      time=datetime.time.max,
                                      tzinfo=datetime.timezone.utc))


def _clear_billing_caches(elastic_url):
    import requests
    try:
        logging.debug('Clearing elastic caches...')
        response = requests.post(elastic_url + '*billing*/_cache/clear')
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        raise BenchmarkError('Elastic caches clearing has failed.') from e


def _to_datetime_str(to_datetime):
    return to_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'


def _request_bar_chart_billings(grouping, api, from_datetime, to_datetime):
    while True:
        try:
            elapsed_time, _ = api._request_benchmark(http_method='post', endpoint='billing/charts', data={
                'from': _to_datetime_str(from_datetime),
                'to': _to_datetime_str(to_datetime),
                'grouping': grouping
            })
            return elapsed_time
        except Exception:
            logging.exception('Request has failed. Retrying in 10s...')
            time.sleep(10)


def _request_plot_chart_billings(interval, filters, api, from_datetime, to_datetime):
    while True:
        try:
            elapsed_time, _ = api._request_benchmark(http_method='post', endpoint='billing/charts', data={
                'from': _to_datetime_str(from_datetime),
                'to': _to_datetime_str(to_datetime),
                'interval': interval,
                'filters': filters
            })
            return elapsed_time
        except Exception:
            logging.exception('Request has failed. Retrying in 10s...')
            time.sleep(10)


def _request_report_billings(report, api, from_datetime, to_datetime):
    while True:
        try:
            elapsed_time, _ = api._download_benchmark(http_method='post', endpoint='billing/export', data={
                'from': _to_datetime_str(from_datetime),
                'to': _to_datetime_str(to_datetime),
                'types': [report]
            })
            return elapsed_time
        except Exception:
            logging.exception('Request has failed. Retrying in 10s...')
            time.sleep(10)


def benchmark_bar_charts(today, api, elastic_url, iterations):
    import numpy as np
    import pandas as pd
    bar_chart_frames = [
        (MONTH, _get_previous_month_datetime_bounds),
        (QUARTER, _get_previous_quarter_datetime_bounds),
        (YEAR, _get_previous_year_datetime_bounds)
    ]
    bar_charts = [
        RESOURCE_TYPE,
        RUN_INSTANCE_TYPE,
        RUN_COMPUTE_TYPE,
        PIPELINE,
        TOOL,
        STORAGE,
        STORAGE_TYPE,
        USER,
        BILLING_CENTER
    ]
    benchmarks_dict = {}
    for bar_chart in bar_charts:
        chart_benchmarks = benchmarks_dict[bar_chart] = benchmarks_dict.get(bar_chart, {})
        for period, period_bounds_func in bar_chart_frames:
            logging.info(f'Benchmarking {bar_chart} {period} bar charts...')
            period_bounds = period_bounds_func(today)
            period_benchmarks = chart_benchmarks[period] = chart_benchmarks.get(period, [])
            for iteration in range(0, iterations):
                _clear_billing_caches(elastic_url)
                elapsed_time = _request_bar_chart_billings(bar_chart, api, *period_bounds)
                period_benchmarks.append(elapsed_time)
                logging.debug(f'Performed {iteration + 1}/{iterations} benchmarks...')
    benchmarks = pd.DataFrame(benchmarks_dict).transpose()
    return benchmarks


def benchmark_plot_charts(today, api, elastic_url, iterations):
    import numpy as np
    import pandas as pd
    plot_chart_frames = [
        (MONTH, _get_previous_month_datetime_bounds, '1d'),
        (QUARTER, _get_previous_quarter_datetime_bounds, '1M'),
        (YEAR, _get_previous_year_datetime_bounds, '1M')
    ]
    plot_charts = [
        (GENERAL, {}),
        (STORAGE, {"resource_type": ["STORAGE"]}),
        (FILE_STORAGE, {"resource_type": ["STORAGE"], "storage_type": ["FILE_STORAGE"]}),
        (OBJECT_STORAGE, {"resource_type": ["STORAGE"], "storage_type": ["OBJECT_STORAGE"]}),
        (COMPUTE, {"resource_type": ["COMPUTE"]}),
        (CPU_COMPUTE, {"resource_type": ["COMPUTE"], "compute_type": ["CPU"]}),
        (GPU_COMPUTE, {"resource_type": ["COMPUTE"], "compute_type": ["GPU"]})
    ]
    benchmarks_dict = {}
    for plot_chart, chart_filters in plot_charts:
        chart_benchmarks = benchmarks_dict[plot_chart] = benchmarks_dict.get(plot_chart, {})
        for period, period_bounds_func, period_interval in plot_chart_frames:
            logging.info(f'Benchmarking {plot_chart} {period} plot charts...')
            period_bounds = period_bounds_func(today)
            period_benchmarks = chart_benchmarks[period] = chart_benchmarks.get(period, [])
            for iteration in range(0, iterations):
                _clear_billing_caches(elastic_url)
                elapsed_time = _request_plot_chart_billings(period_interval, chart_filters, api, *period_bounds)
                period_benchmarks.append(elapsed_time)
                logging.debug(f'Performed {iteration + 1}/{iterations} benchmarks...')
    benchmarks = pd.DataFrame(benchmarks_dict).transpose()
    return benchmarks


def benchmark_reports(today, api, elastic_url, iterations):
    import numpy as np
    import pandas as pd
    report_frames = [
        (MONTH, _get_previous_month_datetime_bounds),
        (QUARTER, _get_previous_quarter_datetime_bounds),
        (YEAR, _get_previous_year_datetime_bounds)
    ]
    reports = [
        RUN,
        USER,
        BILLING_CENTER,
        INSTANCE,
        PIPELINE,
        TOOL,
        STORAGE
    ]
    benchmarks_dict = {}
    for report in reports:
        chart_benchmarks = benchmarks_dict[report] = benchmarks_dict.get(report, {})
        for period, period_bounds_func in report_frames:
            logging.info(f'Benchmarking {report} {period} reports...')
            period_bounds = period_bounds_func(today)
            period_benchmarks = chart_benchmarks[period] = chart_benchmarks.get(period, [])
            for iteration in range(0, iterations):
                _clear_billing_caches(elastic_url)
                elapsed_time = _request_report_billings(report, api, *period_bounds)
                period_benchmarks.append(elapsed_time)
                logging.debug(f'Performed {iteration + 1}/{iterations} benchmarks...')
    benchmarks = pd.DataFrame(benchmarks_dict).transpose()
    return benchmarks


def _benchmark_path(type, analysis_dir, date):
    return os.path.join(analysis_dir, type + '-benchmarks-' + date.isoformat() + '.h5')


def _benchmark(benchmark_type, benchmark_func, benchmarks_path, today, api, elastic_url, iterations):
    import pandas as pd
    if not os.path.exists(benchmarks_path):
        logging.info('Benchmarking %ss...' % benchmark_type)
        benchmarks = benchmark_func(today, api, elastic_url, iterations)
        benchmarks.to_hdf(benchmarks_path, 'df')
    else:
        logging.info('Loading existing %s benchmarks...' % benchmark_type)
        benchmarks = pd.read_hdf(benchmarks_path, 'df')
    return benchmarks


def _calculate_stats(benchmarks):
    if benchmarks.empty:
        return benchmarks
    benchmarks = benchmarks[[MONTH, QUARTER, YEAR]]
    benchmarks = benchmarks.applymap(_throw_out_outliers)
    benchmarks[(MONTH, MEAN)] = benchmarks[MONTH].apply(np.median)
    benchmarks[(QUARTER, MEAN)] = benchmarks[QUARTER].apply(np.median)
    benchmarks[(YEAR, MEAN)] = benchmarks[YEAR].apply(np.median)
    benchmarks = benchmarks.drop(columns=MONTH).drop(columns=QUARTER).drop(columns=YEAR)
    benchmarks = benchmarks[[(MONTH, MEAN), (QUARTER, MEAN), (YEAR, MEAN)]].rename(columns=lambda x: x[0])
    benchmarks = benchmarks[[MONTH, QUARTER, YEAR]]
    return benchmarks
    
    
def _throw_out_outliers(arr, percent_padding=5):
    if len(arr) < 3:
        return arr
    arr = np.sort(arr)
    length_padding = max(1, int(len(arr) * (percent_padding / 100.0)))
    return arr[length_padding:-length_padding]


def _load_previous_benchmark(benchmark_type, benchmarks_path):
    import pandas as pd
    if not os.path.exists(benchmarks_path):
        return pd.DataFrame()
    else:
        logging.info('Loading previous %s benchmarks...' % benchmark_type)
        benchmarks = pd.read_hdf(benchmarks_path, 'df')
        return benchmarks


def _show_plot(title, benchmarks):
    from matplotlib import pyplot as plt
    benchmarks.plot(title=title, rot=45)
    plt.show()


def _show_improvements_heatmap(title, difference_benchmarks, bounds):
    from matplotlib import pyplot as plt
    from matplotlib.colors import LinearSegmentedColormap
    import numpy as np
    cmap = LinearSegmentedColormap.from_list('rg', ['r', 'w', 'g'], N=256)
    c = plt.pcolormesh(difference_benchmarks, cmap=cmap, vmin=bounds[0], vmax=bounds[1])
    plt.yticks(np.arange(0.5, len(difference_benchmarks.index), 1), difference_benchmarks.index)
    plt.xticks(np.arange(0.5, len(difference_benchmarks.columns), 1), difference_benchmarks.columns)
    plt.title(title)
    plt.colorbar(c)
    plt.show()

    
def _diff_percent(benchmarks, previous_benchmarks):
    import pandas as pd
    if benchmarks.empty or previous_benchmarks.empty:
        return pd.DataFrame()
    return (previous_benchmarks - benchmarks) / previous_benchmarks * 100

    
def _diff_abs(benchmarks, previous_benchmarks):
    import pandas as pd
    if benchmarks.empty or previous_benchmarks.empty:
        return pd.DataFrame()
    return previous_benchmarks - benchmarks

In [None]:
logging_format = _extract_parameter('CP_LOGGING_FORMAT', default='%(asctime)s:%(levelname)s: %(message)s')
logging_level = _extract_parameter('CP_LOGGING_LEVEL', default='DEBUG')
work_dir = os.path.join(os.getcwd(), 'benchmarks')
logs_dir = os.path.join(work_dir, 'logs')
pipe_commons_dir = os.path.join(work_dir, 'pipe-commons')
distribution_url = _extract_parameter('DISTRIBUTION_URL',
                                      default='https://cp-api-srv.default.svc.cluster.local:31080/pipeline/')
api_url = _extract_parameter('API', default='https://cp-api-srv.default.svc.cluster.local:31080/pipeline/restapi/')
api_token = _extract_parameter('API_TOKEN')
elastic_url = _extract_parameter('CP_SEARCH_ELK_URL',
                                 default='http://cp-search-elk.default.svc.cluster.local:30091/')
iterations = int(_extract_parameter('CP_BENCHMARK_ITERATIONS', default='100'))

In [None]:
logging.basicConfig(level=logging_level, format=logging_format)
logging.info('Verifying python version...')
if sys.version_info < (3, 7):
    raise BenchmarkError('The benchmark requires at least python 3.7.')

In [None]:
logging.info('Creating system directories...')
_mkdir(work_dir)
_mkdir(pipe_commons_dir)
_mkdir(logs_dir)

In [None]:
logging.info('Installing python packages...')
_install_python_packages(['urllib3==1.25.9', 'requests==2.22.0', 'pandas==1.3.5', 'matplotlib==3.5.1', 'tables==3.7.0'])
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import requests
import pandas as pd
import numpy as np

In [None]:
logging.info('Downloading pipe common...')
_download_file(distribution_url + 'pipe-common.tar.gz', os.path.join(pipe_commons_dir, 'pipe-common.tar.gz'))
logging.info('Unpacking pipe common...')
_extract_archive(os.path.join(pipe_commons_dir, 'pipe-common.tar.gz'), pipe_commons_dir)
logging.info('Installing pipe common...')
_install_python_packages([pipe_commons_dir])

In [None]:
from pipeline.api import PipelineAPI, APIError, HTTPError
api = PipelineAPI(api_url=api_url, log_dir=logs_dir, attempts=1, timeout=120, connection_timeout=120)
import socket
socket.setdefaulttimeout(120)

In [None]:
def _request_benchmark(self, http_method, endpoint, data=None):
    url = '{}/{}'.format(self.api_url, endpoint)
    request_payload = json.dumps(data)
    start_time = time.monotonic()
    response = requests.request(method=http_method, url=url, data=request_payload,
                                headers=self.header, verify=False,
                                timeout=self.connection_timeout)
    elapsed_time = time.monotonic() - start_time
    if response.status_code != 200:
        raise HTTPError('API responded with http status %s.' % str(response.status_code))
    response_data = response.json()
    status = response_data.get('status') or 'ERROR'
    message = response_data.get('message') or 'No message'
    if status != 'OK':
        raise APIError('%s: %s' % (status, message))
    return elapsed_time, response_data.get('payload')


def _download_benchmark(self, http_method, endpoint, data=None):
    url = '{}/{}'.format(self.api_url, endpoint)
    request_payload = json.dumps(data)
    start_time = time.monotonic()
    response = requests.request(method=http_method, url=url, data=request_payload,
                                headers=self.header, verify=False,
                                timeout=self.connection_timeout,
                                stream=True)
    for _ in response.iter_content(chunk_size=1 * 1024 * 1024):
        # reading whole response
        pass
    elapsed_time = time.monotonic() - start_time
    if response.status_code != 200:
        raise HTTPError('API responded with http status %s.' % str(response.status_code))
    return elapsed_time, None

import types
api._request_benchmark = types.MethodType(_request_benchmark, api)
api._download_benchmark = types.MethodType(_download_benchmark, api)

In [None]:
today = datetime.date(year=2022, month=1, day=2)
yesterday = today - datetime.timedelta(days=1)

In [None]:
logging.info('Firing initial benchmarking request...')
_request_bar_chart_billings(BILLING_CENTER, api, *_get_previous_day_datetime_bounds(today))

In [None]:
bar_chart_benchmarks_path = _benchmark_path('bar-chart', work_dir, today)
bar_chart_benchmarks = _benchmark('bar chart', benchmark_bar_charts, bar_chart_benchmarks_path, today, api, elastic_url, iterations)
bar_chart_benchmarks = _calculate_stats(bar_chart_benchmarks)
previous_bar_chart_benchmarks_path = _benchmark_path('bar-chart', work_dir, yesterday)
previous_bar_chart_benchmarks = _calculate_stats(_load_previous_benchmark('bar chart', previous_bar_chart_benchmarks_path))
bar_chart_diff_benchmarks = _diff_percent(bar_chart_benchmarks, previous_bar_chart_benchmarks)
bar_chart_diff_abs_benchmarks = _diff_abs(bar_chart_benchmarks, previous_bar_chart_benchmarks)

In [None]:
bar_chart_benchmarks

In [None]:
plot_chart_benchmarks_path = _benchmark_path('plot-chart', work_dir, today)
plot_chart_benchmarks = _benchmark('plot chart', benchmark_plot_charts, plot_chart_benchmarks_path, today, api, elastic_url, iterations)
plot_chart_benchmarks = _calculate_stats(plot_chart_benchmarks)
previous_plot_chart_benchmarks_path = _benchmark_path('plot-chart', work_dir, yesterday)
previous_plot_chart_benchmarks = _calculate_stats(_load_previous_benchmark('plot chart', previous_plot_chart_benchmarks_path))
plot_chart_diff_benchmarks = _diff_percent(plot_chart_benchmarks, previous_plot_chart_benchmarks)
plot_chart_diff_abs_benchmarks = _diff_abs(plot_chart_benchmarks, previous_plot_chart_benchmarks)

In [None]:
plot_chart_benchmarks

In [None]:
report_benchmarks_path = _benchmark_path('report', work_dir, today)
report_benchmarks = _benchmark('report', benchmark_reports, report_benchmarks_path, today, api, elastic_url, iterations)
report_benchmarks = _calculate_stats(report_benchmarks)
previous_report_benchmarks_path = _benchmark_path('report', work_dir, yesterday)
previous_report_benchmarks = _calculate_stats(_load_previous_benchmark('bar chart', previous_report_benchmarks_path))
report_diff_benchmarks = _diff_percent(report_benchmarks, previous_report_benchmarks)
report_diff_abs_benchmarks = _diff_abs(report_benchmarks, previous_report_benchmarks)

In [None]:
report_benchmarks

In [None]:
_show_plot('Bar charts', bar_chart_benchmarks)
_show_plot('Plot charts', plot_chart_benchmarks)
_show_plot('Reports', report_benchmarks)

In [None]:
_show_plot('Bar charts', bar_chart_benchmarks.join(previous_bar_chart_benchmarks.rename(columns=lambda name: 'PREVIOUS ' + name)))
_show_plot('Plot charts', plot_chart_benchmarks.join(previous_plot_chart_benchmarks.rename(columns=lambda name: 'PREVIOUS ' + name)))
_show_plot('Reports', report_benchmarks.join(previous_report_benchmarks.rename(columns=lambda name: 'PREVIOUS ' + name)))

In [None]:
diff_benchmarks

In [None]:
diff_benchmarks = pd.DataFrame()
if not bar_chart_diff_benchmarks.empty:
    diff_benchmarks = diff_benchmarks.append(bar_chart_diff_benchmarks.rename(lambda name: name + ' [BAR CHART]'))
if not plot_chart_diff_benchmarks.empty:
    diff_benchmarks = diff_benchmarks.append(plot_chart_diff_benchmarks.rename(lambda name: name + ' [PLOT CHART]'))
if not report_diff_benchmarks.empty:
    diff_benchmarks = diff_benchmarks.append(report_diff_benchmarks.rename(lambda name: name + ' [REPORT]'))
if not diff_benchmarks.empty:
    _show_improvements_heatmap('Performance difference, %', diff_benchmarks, (-100, 100))

In [None]:
diff_abs_benchmarks

In [None]:
diff_abs_benchmarks = pd.DataFrame()
if not bar_chart_diff_benchmarks.empty:
    diff_abs_benchmarks = diff_abs_benchmarks.append(bar_chart_diff_abs_benchmarks.rename(lambda name: name + ' [BAR CHART]'))
if not plot_chart_diff_benchmarks.empty:
    diff_abs_benchmarks = diff_abs_benchmarks.append(plot_chart_diff_abs_benchmarks.rename(lambda name: name + ' [PLOT CHART]'))
if not report_diff_benchmarks.empty:
    diff_abs_benchmarks = diff_abs_benchmarks.append(report_diff_abs_benchmarks.rename(lambda name: name + ' [REPORT]'))
if not diff_abs_benchmarks.empty:
    _show_improvements_heatmap('Performance difference, s', diff_abs_benchmarks, (-1, 1))

In [None]:
_show_plot('Bar charts performance difference, s', (previous_bar_chart_benchmarks - bar_chart_benchmarks))
_show_plot('Plot charts performance difference, s', (previous_plot_chart_benchmarks - plot_chart_benchmarks))
_show_plot('Reports performance difference, s', (previous_report_benchmarks - report_benchmarks))