This notebook explores variability in hail's python (macro)-benchmarks when
said benchmarks are executed on the hail batch service. The analyses within 
are based off the methods proposed in [1], albeit slightly modified for long
running benchmarks. The goals of these analyses are

- to determine if we can detect slowdowns of 5% or less reliably when running
  benchmarks on hail batch.
- to identify configurations (number of batch jobs x iterations) that allow us
  to detect slowdowns efficiently (ie without excesssive time and money).

[1] Laaber et al., Software Microbenchmarking in the Cloud.How Bad is it Really?
    https://dl.acm.org/doi/10.1007/s10664-019-09681-1

In [None]:
from pathlib import Path
from typing import Dict, List

import plotly.io as pio
import yaml
from benchmark.tools import annotate_index, maybe
from benchmark.tools.impex import import_benchmarks
from benchmark.tools.plotting import (
    plot_iteration_against_time,
    plot_mean_time_per_instance,
)
from benchmark.tools.statistics import (
    bootstrap_mean_confidence_interval,
    laaber_mds,
    schultz_mds,
    variability,
)
from IPython.display import Pretty, clear_output, display
from plotly.offline import init_notebook_mode

import hail as hl

In [None]:
prefix = str(Path().absolute())
hl.init(backend='spark', quiet=True)

init_notebook_mode()
pio.renderers.default = 'notebook_connected'

### Import benchmark data

Benchmarks under `hail/python/benchmarks` are executed with a custom
pytest plugin and their results are output as json lines (.jsonl).

Unscrupulously, we use hail to analyse itself.

In [None]:
with hl.TemporaryDirectory() as tmpdir:
    ht = import_benchmarks(Path(f'{prefix}/in/benchmarks.jsonl'), tmpdir=tmpdir)
    ht = ht.checkpoint(f"{prefix}/out/benchmarks.ht")

benchmarks = ht.aggregate(hl.agg.collect_as_set(ht.path + hl.str('::') + ht.name))
benchmarks = sorted(benchmarks)
print(*benchmarks, sep='\n')

In this next section, we'll estimate the number of iterations required for
a benchmark to reach a steady-state, or the number of so-called "burn-in"
iterations.

In [None]:
first_stable_index = {
    'test_analyze_benchmarks': 5,
    'test_block_matrix_to_matrix_table_row_major': 4,
    'test_blockmatrix_write_from_entry_expr_range_mt_standardize': 8,
    'test_blockmatrix_write_from_entry_expr_range_mt': 10,
    'test_concordance': 2,
    'test_export_range_matrix_table_col_p100': 15,
    'test_export_range_matrix_table_entry_field_p100': 3,
    'test_export_range_matrix_table_row_p100': 8,
    'test_export_vcf': 8,
    'test_genetics_pipeline': 4,
    'test_gnomad_coverage_stats_optimized': 5,
    'test_gnomad_coverage_stats': 5,
    'test_group_by_collect_per_row': 5,
    'test_group_by_take_rekey': 10,
    'test_hwe_normalized_pca_blanczos_small_data_0_iterations': 8,
    'test_hwe_normalized_pca_blanczos_small_data_10_iterations': 8,
    'test_hwe_normalized_pca': 6,
    'test_import_and_transform_gvcf': 2,
    'test_import_bgen_filter_count': 18,
    'test_import_bgen_force_count_all': 4,
    'test_import_bgen_force_count_just_gp': 20,
    'test_import_bgen_info_score': 12,
    'test_import_gvcf_force_count': 2,
    'test_import_vcf_count_rows': 1,
    'test_import_vcf_write': 5,
    'test_join_partitions_table[10-10]': 4,
    'test_join_partitions_table[10-100]': 2,
    'test_join_partitions_table[10-1000]': 5,
    'test_join_partitions_table[100-10]': 10,
    'test_join_partitions_table[100-100]': 10,
    'test_join_partitions_table[100-1000]': 8,
    'test_join_partitions_table[1000-10]': 12,
    'test_join_partitions_table[1000-100]': 10,
    'test_join_partitions_table[1000-1000]': 8,
    'test_kyle_sex_specific_qc': 6,
    'test_large_range_matrix_table_sum': 5,
    'test_ld_prune_profile_25': 10,
    'test_linear_regression_rows': 10,
    'test_logistic_regression_rows_wald': 5,
    'test_make_ndarray': 5,
    'test_matrix_table_aggregate_entries': 8,
    'test_matrix_table_array_arithmetic': 20,
    'test_matrix_table_call_stats_star_star': 8,
    'test_matrix_table_cols_show': 5,
    'test_matrix_table_decode_and_count_just_gt': 5,
    'test_matrix_table_decode_and_count': 8,
    'test_matrix_table_entries_show': 4,
    'test_matrix_table_entries_table_no_key': 4,
    'test_matrix_table_entries_table': 10,
    'test_matrix_table_filter_entries_unfilter': 8,
    'test_matrix_table_filter_entries': 6,
    'test_matrix_table_many_aggs_col_wise': 3,
    'test_matrix_table_many_aggs_row_wise': 2,
    'test_matrix_table_nested_annotate_rows_annotate_entries': 4,
    'test_matrix_table_rows_force_count': 20,
    'test_matrix_table_rows_is_transition': 5,
    'test_matrix_table_rows_show': 10,
    'test_matrix_table_scan_count_cols_2': 20,
    'test_matrix_table_scan_count_rows_2': 5,
    'test_matrix_table_show': 7,
    'test_matrix_table_take_col': 10,
    'test_matrix_table_take_entry': 8,
    'test_matrix_table_take_row': 10,
    'test_minimal_detectable_slowdown[laaber_mds]': 5,
    'test_minimal_detectable_slowdown[schultz_mds]': 6,
    'test_mt_group_by_memory_usage': 5,
    'test_mt_localize_and_collect': 5,
    'test_ndarray_addition': 10,
    'test_ndarray_matmul_float64': 6,
    'test_ndarray_matmul_int64': 10,
    'test_pc_relate_5k_5k': 4,
    'test_pc_relate': 3,
    'test_per_row_stats_star_star': 10,
    'test_python_only_10k_combine': 6,
    'test_python_only_10k_transform': 10,
    'test_read_decode_gnomad_coverage': 10,
    'test_read_force_count_partitions[10]': 10,
    'test_read_force_count_partitions[100]': 8,
    'test_read_force_count_partitions[1000]': 10,
    'test_read_with_index[1000]': 8,
    'test_sample_qc': 3,
    'test_sentinel_cpu_hash_1': 5,
    'test_sentinel_read_gunzip': 10,
    'test_shuffle_key_by_aggregate_bad_locality': 8,
    'test_shuffle_key_by_aggregate_good_locality': 5,
    'test_shuffle_key_rows_by_4096_byte_rows': 2,
    'test_shuffle_key_rows_by_65k_byte_rows': 4,
    'test_shuffle_key_rows_by_mt': 10,
    'test_shuffle_order_by_10m_int': 10,
    'test_split_multi_hts': 10,
    'test_split_multi': 8,
    'test_sum_table_of_ndarrays': 5,
    'test_table_aggregate_approx_cdf': 4,
    'test_table_aggregate_array_sum': 5,
    'test_table_aggregate_counter': 10,
    'test_table_aggregate_downsample_dense': 5,
    'test_table_aggregate_downsample_worst_case': 10,
    'test_table_aggregate_int_stats': 5,
    'test_table_aggregate_linreg': 8,
    'test_table_aggregate_take_by_strings': 10,
    'test_table_annotate_many_flat': 5,
    'test_table_big_aggregate_compilation': 4,
    'test_table_big_aggregate_compile_and_execute': 2,
    'test_table_expr_take': 25,
    'test_table_foreign_key_join[1000000-1000]': 3,
    'test_table_foreign_key_join[1000000-1000000]': 3,
    'test_table_group_by_aggregate_sorted': 8,
    'test_table_group_by_aggregate_unsorted': 7,
    'test_table_import_ints_impute': 8,
    'test_table_import_ints': 4,
    'test_table_import_strings': 3,
    'test_table_key_by_shuffle': 3,
    'test_table_python_construction': 10,
    'test_table_range_array_range_force_count': 6,
    'test_table_range_force_count': 10,
    'test_table_range_join[1000000000-1000]': 20,
    'test_table_range_join[1000000000-1000000000]': 10,
    'test_table_range_means': 10,
    'test_table_read_force_count_ints': 5,
    'test_table_read_force_count_strings': 4,
    'test_table_scan_prev_non_null': 20,
    'test_table_scan_sum_1k_partitions': 2,
    'test_table_show': 12,
    'test_table_take': 20,
    'test_test_head_and_tail_region_memory': 10,
    'test_test_inner_join_region_memory': 10,
    'test_test_left_join_region_memory': 10,
    'test_test_map_filter_region_memory': 10,
    'test_union_partitions_table[10-10]': 4,
    'test_union_partitions_table[10-100]': 5,
    'test_union_partitions_table[10-1000]': 8,
    'test_union_partitions_table[100-10]': 16,
    'test_union_partitions_table[100-100]': 5,
    'test_union_partitions_table[100-1000]': 15,
    'test_union_partitions_table[1000-10]': 10,
    'test_union_partitions_table[1000-100]': 6,
    'test_union_partitions_table[1000-1000]': 15,
    'test_variant_and_sample_qc_nested_with_filters_2': 2,
    'test_variant_and_sample_qc_nested_with_filters_4_counts': 8,
    'test_variant_and_sample_qc_nested_with_filters_4': 10,
    'test_variant_and_sample_qc': 10,
    'test_variant_qc': 5,
    'test_vds_combiner_chr22': 10,
    'test_write_profile_mt': 8,
    'test_write_range_matrix_table_p100': 2,
    'test_write_range_table[10000000-10]': 3,
    'test_write_range_table[10000000-100]': 3,
    'test_write_range_table[10000000-1000]': 6,
}

Short of an accurate algorithm for computing this, you, noble reader, are
tasked with the mind-numbing task of looking at graphs and picking numbers.
This is an iterative process and you'll likely lose the will to live mid-way.

Persevere, friend. Your sacrifice will not go unrewarded.

In what follows, you'll be shown two plots. On the top will be the unfiltered
benchmark times vs iteration for all batch jobs. The plot below will show the
same benchmark filtered to the number of burn in iterations you selected
previously.

You'll be prompted to enter a new first stable index for each benchmark until
you arrive at a fixed point. Note that some benchmarks never really reach
stability. In this case try to pick a value that compromises between cost and
accuracy (ie if a benchmark is really slow, we don't want to be doing tons
of burn in iterations, whereas for a fast benchmark we could justify more).

Good luck.

In [None]:
ht = hl.read_table(f'{prefix}/out/benchmarks.ht')
names: List[str] = ht.aggregate(hl.agg.collect_as_set(ht.name))  # type: ignore
names = sorted(names)

while len(names) != 0:
    __new_names, names = names, []
    for fig in plot_iteration_against_time(ht, __new_names, first_stable_index):
        clear_output(wait=True)
        pio.renderers.default = 'notebook'

        name: str = fig.labels.title  # type: ignore
        cur_index = first_stable_index.get(name)

        try:
            fig.show()
        except:
            continue

        try:
            new_index = maybe(int, input('Enter the first stable index (or blank keep same)') or None)
            if new_index is not None and new_index != cur_index:
                first_stable_index[name] = new_index
                names.append(name)
        except KeyboardInterrupt as _:
            break

first_stable_index

As a final step of cleaning, we'll filter out iterations that differ by some
multiplier of the median for each instance

In [None]:
def filter_burn_in_iterations(ht: hl.Table, first_stable_index: Dict[str, int]) -> hl.Table:
    ht = ht.annotate_globals(first_stable_index=first_stable_index)
    return ht.select(
        instances=ht.instances.map(
            lambda instance: instance.annotate(
                iterations=hl.filter(
                    lambda t: t.idx >= ht.first_stable_index.get(ht.name, 0),
                    annotate_index(instance.iterations),
                ),
            )
        ),
    )


def filter_outliers(ht: hl.Table, factor: hl.Float64Expression) -> hl.Table:
    return ht.select(
        instances=ht.instances.map(
            lambda instance: instance.annotate(
                trials=hl.bind(
                    lambda median: instance.iterations.filter(
                        lambda t: hl.max([t.time, median]) / hl.min([t.time, median]) < factor
                    ),
                    hl.median(instance.iterations.map(lambda t: t.time)),
                )
            ),
        ),
    )


def filter_names(ht: hl.Table, names: hl.SetExpression) -> hl.Table:
    return ht.filter(names.contains(ht.name))


def filter_failed_iterations(ht: hl.Table) -> hl.Table:
    return ht.annotate(
        instances=ht.instances.map(
            lambda i: i.annotate(
                iterations=hl.filter(
                    lambda t: ~t.timed_out | hl.is_missing(t.failure),
                    i.iterations,
                ),
            )
        ),
    )


def filter_non_viable_instances(
    ht: hl.Table,
    ninstances: hl.Int32Expression,
    niterations: hl.Int32Expression,
) -> hl.Table:
    ht = ht.annotate(
        instances=hl.filter(
            lambda instance: hl.len(instance.iterations) >= niterations,
            ht.instances,
        ),
    )

    return ht.filter(hl.len(ht.instances) >= ninstances)


ht = hl.read_table(f'{prefix}/out/benchmarks.ht')
all: List[str] = ht.aggregate(hl.agg.collect_as_set(ht.name))  # type: ignore

ht = filter_names(ht, hl.set([n for n in all if n in first_stable_index]))
ht = filter_burn_in_iterations(ht, first_stable_index)
ht = filter_failed_iterations(ht)
ht = filter_outliers(ht, hl.float64(10))
ht = filter_non_viable_instances(ht, hl.int(50), hl.int(50))
ht = ht.checkpoint(f'{prefix}/out/filtered.ht', overwrite=True)

benchmarks = ht.aggregate(hl.agg.collect_as_set(ht.name))

print('Filtered:', *(n for n in all if n not in set(benchmarks)), sep='\n')

In [None]:
# These plots show the mean time per instance. This provides a visual way of
# identifying differences in instance type if there are multiple distinct layers

ht = hl.read_table(f'{prefix}/out/filtered.ht')
for f in plot_mean_time_per_instance(ht):
    f.show()

In [None]:
# laaber et al. section 4
ht = hl.read_table(f'{prefix}/out/filtered.ht')
ht = ht.select(instances=ht.instances.iterations.time)
ht.select(variability=variability(ht)).show()

In [None]:
# laaber et al. section 5 - boostrapping confidence intervals of the mean

bootstrap_mean_confidence_interval(ht, 1000, 0.95).show()

In [None]:
# Optionally, use QoB for the next section
hl.stop()
hl.init(backend='batch')

new_prefix = hl.current_backend().remote_tmpdir
hl.current_backend().fs.copy(f'{prefix}/out/filtered.ht', f'{new_prefix}/out/filtered.ht')
prefix, new_prefix = new_prefix, prefix

In [None]:
# Laaber et al - Minimal-Detectable Slowdown

ht = hl.read_table(f'{prefix}/out/filtered.ht')
ht = ht.select(instances=ht.instances.iterations.time)

laaber_mds(ht).write(f'{prefix}/out/laaber-mds.ht')
schultz_mds(ht).write(f'{prefix}/out/schultz-mds.ht')

In [None]:
# Switch back to local spark for plotting
for mds in ['laaber', 'schultz']:
    hl.current_backend().fs.copy(f'{prefix}/out/{mds}-mds.ht-2', f'{new_prefix}/out/{mds}-mds.ht-2')

prefix, new_prefix = new_prefix, prefix
hl.stop()
hl.init(backend='spark')

In [None]:
t = hl.read_table(f'{prefix}/out/filtered.ht')
t = t.select(instances=t.instances.iterations.time)
t = t.select(
    mean=t.instances.aggregate(lambda k: hl.agg.explode(hl.agg.mean, k)),
    cv=variability(t).total,
)
benchmarks = t.aggregate(hl.agg.collect((t.path, t.name, t.mean, t.cv)))

laaber, schultz = [
    (
        t := hl.read_table(f'{prefix}/out/{m}-mds.ht'),
        t := t._key_by_assert_sorted('path', 'name', 'slowdown', 'ninstances', 'niterations'),
        t.select('ibs', 'tbs'),
    )[-1]
    for m in ('laaber', 'schultz')
]

mds = laaber.select(laaber=laaber.row_value, schultz=schultz[laaber.key])

for path, name, rt, cv in benchmarks:
    info = Pretty(
        yaml.dump(
            data=[
                {
                    'item': f'{path}::{name}',
                    'burn_in': first_stable_index[name],
                    'mean': float(f'{rt:.3f}'),
                    'cv': float(f'{cv:.3f}'),
                    'config': [],
                }
            ],
            sort_keys=False,
        )
    )

    display(info, clear=True)

    t = mds.filter((mds.path == path) & (mds.name == name))
    t.filter(t.slowdown == 1).show(100_000)

    try:
        config = input('Enter `ninstances, niterations` that minimises the likelihood of detecting a false positive.')

        if not config:
            continue

        display(info, clear=True)

        # Now identify a configuration of instances, iterations and burn-in iterations
        m, n = [int(x.strip()) for x in config.split(',')]
        t.filter((t.slowdown > 1) & (hl.tuple([t.ninstances, t.niterations]) >= (m, n))).show(100_000)

        input()
    except KeyboardInterrupt as _:
        break