Skip to content

Commit

Permalink
Update benchmark sampling method and statistics (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
popematt committed Aug 22, 2023
1 parent f3f1b3c commit c73cf2a
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 94 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', 'pypy-3.7', 'pypy-3.8']
python-version: ['3.8', '3.9', '3.10', 'pypy-3.8', 'pypy-3.10']
fail-fast: false
steps:
- uses: actions/checkout@v2
Expand Down
60 changes: 42 additions & 18 deletions amazon/ionbenchmark/benchmark_runner.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
This module
"""
import gc
import tempfile
import os
import platform
import tempfile
import time
import timeit

from amazon.ionbenchmark.benchmark_spec import BenchmarkSpec
import amazon.ionbenchmark.Format as _format
from amazon.ionbenchmark.sample_dist import SampleDist

_pypy = platform.python_implementation() == 'PyPy'
if not _pypy:
Expand All @@ -21,24 +20,34 @@
class BenchmarkResult:
"""
Results generated by the `run_benchmark` function.
* `timings` is a list of numbers representing the number of nanoseconds to complete each iteration
* `batch_size` is the number of times the function was invoked in each iteration
* `peak_memory_usage` is the peak memory allocated during a single run of the benchmark function, in bytes
"""
timings = None
batch_size = None
peak_memory_usage = None
nanos_per_op: SampleDist = None
ops_per_second: SampleDist = None
peak_memory_usage = None # measured in bytes

def __init__(self, timings, batch_size, peak_memory_usage):
self.timings = timings
self.batch_size = batch_size
def __init__(self, nanos_per_op, ops_per_second, peak_memory_usage):
self.nanos_per_op = SampleDist(nanos_per_op)
self.ops_per_second = SampleDist(ops_per_second)
self.peak_memory_usage = peak_memory_usage


def run_benchmark(benchmark_spec: BenchmarkSpec):
"""
Run benchmarks for `benchmark_spec`.
The overall approach of this runner is to time multiple samples where each sample consists of multiple invocations
of the test function. As a rule of thumb, a sample size of 30 is the minimum needed to have a useful level of
confidence in the results. The margin of error is (roughly speaking) inversely proportional to the square root of
the sample size, so adding more samples can increase the confidence, but it will have increasingly diminishing
improvements. As a rule of thumb, it's never worth having a sample size greater than 1000.
This approach is sound because of the Central Limit Theorem. For an approachable introduction, see
https://www.kristakingmath.com/blog/sampling-distribution-of-the-sample-mean.
The reason for multiple invocations per sample is to prevent very short functions from being dominated by
differences in memory locations or other small differences from one sample to the next. This runner uses the `Timer`
utility's `autorange()` function to determine the number of times the function must be invoked for it to run for
at least 1 second. That number is then used as the number of invocations for _every_ sample in the set.
"""
test_fun = _create_test_fun(benchmark_spec)

Expand All @@ -59,11 +68,26 @@ def run_benchmark(benchmark_spec: BenchmarkSpec):
# warm up
timer.timeit(benchmark_spec.get_warmups())

# iteration
(batch_size, _) = timer.autorange()
timings = timer.repeat(benchmark_spec.get_iterations(), batch_size)
# TODO: Consider making the target batch time or the batch size configurable instead of using this hack.
if "PYTEST_CURRENT_TEST" in os.environ:
# make the unit tests run in a reasonable time
batch_size = 1
else:
# range-finding
# This needs the default timer (measuring in seconds) to work correctly, so it's a different Timer instance.
(batch_size, _) = timeit.Timer(stmt=test_fun, setup=setup).autorange()
# Ad hoc testing indicates that samples of 1-2 seconds give tighter results than the default 0.2 seconds, but for
# very quick testing, this can be annoyingly slow.
batch_size *= 5 # ~1-2 seconds

# sample collection (iterations)
raw_timings = timer.repeat(benchmark_spec.get_iterations(), batch_size)

# Normalize the samples (i.e. remove the effect of the batch size) before returning the results
nanos_per_op = [t/batch_size for t in raw_timings]
ops_per_sec = [1000000000.0 / t for t in nanos_per_op]

return BenchmarkResult(timings, batch_size, peak_memory_usage)
return BenchmarkResult(nanos_per_op, ops_per_sec, peak_memory_usage)


def _create_test_fun(benchmark_spec: BenchmarkSpec):
Expand Down
44 changes: 31 additions & 13 deletions amazon/ionbenchmark/ion_benchmark_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

from amazon.ionbenchmark.Format import format_is_ion, rewrite_file_to_format
from amazon.ionbenchmark.benchmark_runner import run_benchmark
from amazon.ionbenchmark.report import report_stats
from amazon.ionbenchmark.report import report_stats, get_report_field_by_name
from amazon.ionbenchmark.benchmark_spec import BenchmarkSpec

# Relate pypy incompatible issue - https://github.com/amazon-ion/ion-python/issues/227
Expand All @@ -51,15 +51,15 @@ def compare_command():
Compare the results of two benchmarks to determine if <new_result> has regressed compared to <previous_result>.
Usage:
ion_python_benchmark_cli.py compare <previous_result> <new_result> [-fq][--abc <bool>][--threshold <THRESHOLD>][--output <PATH>][-c <FIELD>]...
ion_python_benchmark_cli.py compare <previous_result> <new_result> [-fq][--threshold <THRESHOLD>][--output <PATH>][-c <FIELDS>]
Arguments:
<previous_result> A report from running a benchmark at some point in the past.
<new_result> A new report to compare against
Options:
-c <FIELD>, --compare <FIELD> A field to compare in the reports. [default: file_size(B) time_min(ns)]
-c <FIELDS>, --compare <FIELDS> A comma separated list of fields to compare in the reports. [default: file_size,time_mean]
-o --output PATH File to write the regression report.
-q --quiet Suppress writing regressions to std out. [default: False]
-t <FLOAT>, --threshold <FLOAT> Margin of error for comparison. [default: 0.20]
Expand All @@ -70,7 +70,13 @@ def compare_command():
current_path = args['<new_result>']
output_file_for_comparison = args['--output']
regression_threshold = float(args['--threshold'])
comparison_keywords = args['--compare']
comparison_keywords_arg = args['--compare']

# TODO: Update this command to use the information in REPORT_FIELDS, such as the direction of improvement (doi).
# https://github.com/amazon-ion/ion-python/issues/281
# Without that (i.e. right now), the compare command will actually fail when the ops/sec metric improves. :S

comparison_fields = [get_report_field_by_name(name) for name in comparison_keywords_arg.split(",")]

with open(previous_path, 'br') as p, open(current_path, 'br') as c:
previous_results = ion.load(p)
Expand All @@ -81,18 +87,22 @@ def compare_command():
# For results of each configuration pattern with the same file
for idx, prev_result in enumerate(previous_results):
cur_result = current_results[idx]
name = cur_result['name']
result = {'name': name}
for keyword in comparison_keywords:
cur = float(cur_result[keyword])
prev = float(prev_result[keyword])
case_name = cur_result['name']
result = {'name': case_name}
for field in comparison_fields:
if field.units is not None:
key = f"{field.name}({field.units})"
else:
key = field.name
cur = float(cur_result[key])
prev = float(prev_result[key])
relative_diff = (cur - prev) / prev
pct_diff = f"{relative_diff:.2%}"
result[keyword] = pct_diff
result[key] = pct_diff

if relative_diff > regression_threshold:
if not args['--quiet']:
print(f"{name} '{keyword}' changed by {pct_diff}: {prev} => {cur}")
print(f"{case_name} '{key}' changed by {pct_diff}: {prev} => {cur}")
has_regression = True

report.append(result)
Expand Down Expand Up @@ -207,7 +217,7 @@ def run_spec_command():
-o --output FILE Destination to store the report. If unset, prints to std out.
-r --report FIELDS Comma-separated list of fields to include in the report. [default: file_size, time_min, time_mean, memory_usage_peak]
-r --report FIELDS Comma-separated list of fields to include in the report. [default: file_size,ops/s_mean,ops/s_error,memory_usage_peak]
Example:
./ion_python_benchmark_cli.py run my_spec_file.ion -d '{iterations:1000}' -o '{warmups:0}' -r "time_min, file_size, peak_memory_usage"
Expand Down Expand Up @@ -284,7 +294,15 @@ def _run_benchmarks(specs: list, report_fields, output_file):
result_stats = report_stats(benchmark_spec, result, report_fields)
report.append(result_stats)

print(tabulate(report, tablefmt='fancy_grid', headers='keys'))
# TODO: Add some option to dump or otherwise expose the raw sample data. For now, you can
# uncomment the following lines to get the raw results as CSV that can be copy/pasted into a spreadsheet.
#
# printable_key = benchmark_spec.get_name().replace(" ", "").replace(",", "-")
# for _x in [printable_key, *result.timings]:
# print(_x, end=",")
# print("")

print(tabulate(report, tablefmt='pipe', headers='keys', floatfmt='.2f'))

if output_file:
des_dir = os.path.dirname(output_file)
Expand Down
4 changes: 3 additions & 1 deletion amazon/ionbenchmark/ion_load_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ class IonLoadDump:
def __init__(self, binary, c_ext=True):
self._binary = binary
self._single_value = False
self._c_ext = c_ext
# Need an explicit check here because if `None` is passed in as an argument, that is different from no argument,
# and results in an unexpected behavior.
self._c_ext = c_ext if c_ext is not None else True

def loads(self, s):
ion.c_ext = self._c_ext
Expand Down
132 changes: 74 additions & 58 deletions amazon/ionbenchmark/report.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,74 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import os
import statistics
from math import ceil

from collections.abc import Callable

from dataclasses import dataclass

from amazon.ionbenchmark.benchmark_runner import BenchmarkResult
from amazon.ionbenchmark.benchmark_spec import BenchmarkSpec


def report_stats(benchmark_spec: BenchmarkSpec, benchmark_result: BenchmarkResult, report_fields: list = None):
@dataclass(frozen=True)
class ReportField:
"""
Represents a field that can be included in the benchmark test report
"""
name: str
compute_fn: Callable # Callable[[BenchmarkSpec, BenchmarkResult], Any]
units: str = None
# Direction of improvement, used by the compare command
doi: int = None


REPORT_FIELDS = [
# TODO: Make sure we have the fields we need to perform a statistically meaningful comparison
# I.e. if we end up needing to use ANOVA or Independent Samples T Test, do we have the fields we need?
ReportField(name="format",
compute_fn=lambda spec, _: spec.get_format()),
ReportField(name="input_file",
compute_fn=lambda spec, _: spec.get_input_file()),
ReportField(name="operation",
compute_fn=lambda spec, _: spec.derive_operation_name()),
ReportField(name="file_size", units="B", doi=-1,
compute_fn=lambda spec, _: spec.get_input_file_size()),
ReportField(name="memory_usage_peak", units="B", doi=-1,
compute_fn=lambda _, result: result.peak_memory_usage),
ReportField(name="time_mean", units="ns", doi=-1,
compute_fn=lambda _, result: result.nanos_per_op.mean),
ReportField(name="time_min", units="ns", doi=-1,
compute_fn=lambda _, result: result.nanos_per_op.min),
ReportField(name="time_max", units="ns", doi=-1,
compute_fn=lambda _, result: result.nanos_per_op.max),
ReportField(name="time_sd", units="ns",
compute_fn=lambda _, result: result.nanos_per_op.stdev),
ReportField(name="time_rsd", units="%",
compute_fn=lambda _, result: result.nanos_per_op.rstdev * 100),
ReportField(name="time_error", units="ns",
compute_fn=lambda _, result: result.nanos_per_op.margin_of_error(confidence=0.999)),
ReportField(name="ops/s_mean", doi=+1,
compute_fn=lambda _, result: result.ops_per_second.mean),
ReportField(name="ops/s_min", doi=+1,
compute_fn=lambda _, result: result.ops_per_second.min),
ReportField(name="ops/s_max", doi=+1,
compute_fn=lambda _, result: result.ops_per_second.max),
ReportField(name="ops/s_sd",
compute_fn=lambda _, result: result.ops_per_second.stdev),
ReportField(name="ops/s_rsd", units="%",
compute_fn=lambda _, result: result.ops_per_second.rstdev * 100),
ReportField(name="ops/s_error",
compute_fn=lambda _, result: result.ops_per_second.margin_of_error(confidence=0.999)),
]


def get_report_field_by_name(name: str):
for field in REPORT_FIELDS:
if name == field.name:
return field
raise ValueError(f"Not a valid report field: {name}")


def report_stats(benchmark_spec: BenchmarkSpec, benchmark_result: BenchmarkResult, report_fields: list):
"""
Generate a report for the outcome of a running a benchmark.
Expand All @@ -19,69 +79,25 @@ def report_stats(benchmark_spec: BenchmarkSpec, benchmark_result: BenchmarkResul
* `input_file` – the file used for this benchmark
* `format` – the format used for this benchmark
* `memory_usage_peak` – the peak amount of memory allocated while running the benchmark function
* `time_<stat>` – time statistic for the benchmark; `<stat>` can be `mean`, `min`, `max`, `median`, or `p<n>` where
`<n>` is any number from 0 to 100 inclusive.
* `rate_<stat>` – throughput statistic for the benchmark; `<stat>` can be `mean`, `min`, `max`, `median`, or `p<n>`
where `<n>` is any number from 0 to 100 inclusive.
* `time_<stat>` – time statistic for the benchmark
* `ops/s_<stat>` number of operations (invocations of the benchmark function) per second
`<stat>` can be `mean`, `min`, `max`, `median`, `error`, `stdev`, or `rstdev`
:param benchmark_spec: The spec for the benchmark that was run
:param benchmark_result: The output from the benchmark
:param report_fields: list[str] of fields to include in the report.
:return:
"""
if report_fields is None:
report_fields = ['file_size', 'time_min', 'time_mean', 'memory_usage_peak']

result = {'name': benchmark_spec.get_name()}

for field in report_fields:
if isinstance(field, str) and field.startswith("time_"):
# Note–we use `field[len("time_"):]` instead of `removeprefix("time_")` to support python 3.7 and 3.8
stat_value = _calculate_timing_stat(field[len("time_"):], benchmark_result.timings, benchmark_result.batch_size)
result[f'{field}(ns)'] = stat_value
elif isinstance(field, str) and field.startswith("rate_"):
timing_value = _calculate_timing_stat(field[len("rate_"):], benchmark_result.timings, benchmark_result.batch_size)
stat_value = ceil(benchmark_spec.get_input_file_size() * 1024 / (timing_value / benchmark_result.batch_size / 1000000000))
result[f'{field}(kB/s)'] = stat_value
elif field == 'format':
result['format'] = benchmark_spec.get_format()
elif field == 'input_file':
result['input_file'] = os.path.basename(benchmark_spec.get_input_file())
elif field == 'operation':
result['operation'] = benchmark_spec.derive_operation_name()
elif field == 'file_size':
result['file_size(B)'] = benchmark_spec.get_input_file_size()
elif field == 'memory_usage_peak':
result['memory_usage_peak(B)'] = benchmark_result.peak_memory_usage
elif field == 'name':
pass
for field_name in report_fields:
field = get_report_field_by_name(field_name)
if field.units is not None:
key = f"{field.name}({field.units})"
else:
raise ValueError(f"Unrecognized report field '{field}'")
key = field.name
result[key] = field.compute_fn(benchmark_spec, benchmark_result)

return result


def _calculate_timing_stat(stat: str, timings, batch_size):
"""
Calculate a statistic for the given timings.
:param stat: Name of a statistic. Can be `min`, `max`, `median`, `mean`, or `p<N>` where `N` is 0 to 100 exclusive.
:param timings: List of result times from running the benchmark function.
:param batch_size: Number of times the benchmark function was invoked to produce a single timing result.
:return:
"""
if stat.startswith("p"):
n = int(stat[1:])
x = ceil(statistics.quantiles(timings, n=100, method='inclusive')[n-1]/batch_size)
elif stat == 'mean':
x = ceil(sum(timings) / (batch_size * len(timings)))
elif stat == 'min':
x = ceil(min(timings) / batch_size)
elif stat == 'max':
x = ceil(max(timings) / batch_size)
elif stat == 'median':
x = ceil(statistics.median(timings) / batch_size)
else:
raise ValueError(f"Unrecognized statistic {stat}")
return x

0 comments on commit c73cf2a

Please sign in to comment.