In [None]:
import numpy as np
import ipyparallel as ipp
from qp.metrics.point_estimate_metric_classes import (
    PointSigmaIQR,
    PointBias,
    PointSigmaMAD,
    PointOutlierRate,
)

In [None]:
# Generate the random numbers 
SEED = 1002330
rng = np.random.default_rng(SEED)

chunk_size = 10_000
n_chunk = 10
total_size = n_chunk*chunk_size

estimate = rng.lognormal(mean=1.0, sigma=2, size=total_size)
reference = rng.lognormal(mean=1.3, sigma=1.9, size=total_size)

In [None]:
#generator that yields chunks from estimate and reference
def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

# create an iterator that yields chunks of chunk_size elements
estimate_chunks = chunker(estimate, chunk_size)
reference_chunks = chunker(reference, chunk_size)

In [None]:
# A function to pass to MPI
def mpi_example(chunk):
    centroids = chunk[0].accumulate(chunk[1], chunk[2])
    return centroids

Here is a function that will configure a local cluster of 4 nodes using MPI as the engine.

A metric estimator class is passed in as well as list of 3-tuple "data chunks".

The 3-tuple is (metric class, chunk_of_estimated_values, chunk_of_reference_values)

In [None]:
def run_parallel_metric(data_chunks):
    with ipp.Cluster(controller_ip="*", engines="mpi", n=4) as rc:
        # get a broadcast_view on the cluster which is best
        # suited for MPI style computation
        view = rc.load_balanced_view()
        # run the mpi_example function on all engines in parallel
        asyncresult = view.map_async(mpi_example, data_chunks)
        # Retrieve and print the result from the engines
        asyncresult.wait_interactive()
        # retrieve actual results
        result = asyncresult.get()
        # get and print the results
        for i, res in enumerate(result):
            np.array(res)
            print(f"{i} : {res.shape}")
        metric_estimator = data_chunks[0][0]
        final = metric_estimator.finalize(centroids=result)
        print(final)

### An example running the PointSigmaIQR metric directly and in parallel

In [None]:
# Set up for ipyparallel
config = {'tdigest_compression': 1000}

sigma_iqr_estimator = PointSigmaIQR(**config)
sigma_iqr_estimator_list = [sigma_iqr_estimator]*n_chunk
iqr_data_chunks = [chunk for chunk in zip(sigma_iqr_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]


In [None]:
PointSigmaIQR().evaluate(estimate, reference)

In [None]:
run_parallel_metric(iqr_data_chunks)

### An example running the PointBias metric directly and in parallel

In [None]:
# Set up for ipyparallel
config = {'tdigest_compression': 1000}

point_bias_estimator = PointBias(**config)
point_bias_estimator_list = [point_bias_estimator]*n_chunk
point_bias_data_chunks = [chunk for chunk in zip(point_bias_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]

In [None]:
PointBias().evaluate(estimate, reference)

In [None]:
run_parallel_metric(point_bias_data_chunks)

### An example running PointSigmaMAD directly and in parallel

In [None]:
# An example with PointSigmaMAD
config = {'num_bins': 1_000_000, 'tdigest_compression': 1000}
point_sigma_mad_estimator = PointSigmaMAD(**config)
point_sigma_mad_estimator_list = [point_sigma_mad_estimator]*n_chunk
point_sigma_mad_data_chunks = [chunk for chunk in zip(point_sigma_mad_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]

In [None]:
PointSigmaMAD().evaluate(estimate, reference)

This cell allows for adjustment of the `num_bins` parameter.

Larger values trend closer to the analytic result from the cell above.

In [None]:
config = {'num_bins': 1_000_000, 'tdigest_compression': 1000}
psmad = PointSigmaMAD(**config)
centroids = psmad.accumulate(estimate, reference)

#default value for `num_bins` is 1_000_000
psmad.finalize(centroids=[centroids])

In [None]:
run_parallel_metric(point_sigma_mad_data_chunks)

### An example running PointOutlierRate metric directly and in parallel

In [None]:
# An example with PointOutlierRate
config = {'tdigest_compression': 1000}
point_outlier_estimator = PointOutlierRate(**config)
point_outlier_estimator_list = [point_outlier_estimator]*n_chunk
point_outlier_data_chunks = [chunk for chunk in zip(point_outlier_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]

In [None]:
PointOutlierRate().evaluate(estimate, reference)

The parallel estimation of the metric trends closer to the analytic as the value of `compression` is increased.

The default value for compression is 1000. If set to 10_000, the estimate becomes 0.13663.

Note that, of course, setting compression = 10_000 increases memory usage with minimal affect on runtime.

In [None]:
config = {'tdigest_compression': 1000}
por = PointOutlierRate(**config)
centroids = por.accumulate(estimate, reference)

por.finalize(centroids=[centroids])

In [None]:
run_parallel_metric(point_outlier_data_chunks)