In [None]:
import numpy as np
import ipyparallel as ipp
from qp.metrics.point_estimate_metric_classes import (
    PointSigmaIQR,
    PointBias,
)

In [None]:
# Generate the random numbers 
SEED = 1002330
rng = np.random.default_rng(SEED)

chunk_size = 10_000
n_chunk = 10
total_size = n_chunk*chunk_size

estimate = rng.lognormal(mean=1.0, sigma=2, size=total_size)
reference = rng.lognormal(mean=1.3, sigma=1.9, size=total_size)

In [None]:
#generator that yields chunks from estimate and reference
def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

# create an iterator that yields chunks of chunk_size elements
estimate_chunks = chunker(estimate, chunk_size)
reference_chunks = chunker(reference, chunk_size)

In [None]:
# A function to pass to MPI
def mpi_example(chunk):
    centroids = chunk[0].accumulate(chunk[1], chunk[2])
    return centroids

In [None]:
# Set up the data for ipyparallel

# An example with PointSigmaIQR
sigma_iqr_estimator = PointSigmaIQR()
sigma_iqr_estimator_list = [sigma_iqr_estimator]*n_chunk
iqr_data_chunks = [chunk for chunk in zip(sigma_iqr_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]

# An example with PointBias
point_bias_estimator = PointBias()
point_bias_estimator_list = [point_bias_estimator]*n_chunk
point_bias_data_chunks = [chunk for chunk in zip(point_bias_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]

The following is a hardcoded version of the ipyparallel driver to run PointSigmaIQR in parallel

In [None]:
with ipp.Cluster(controller_ip="*", engines="mpi", n=4) as rc:
    # get a broadcast_view on the cluster which is best
    # suited for MPI style computation
    view = rc.load_balanced_view()
    # run the mpi_example function on all engines in parallel
    asyncresult = view.map_async(mpi_example, iqr_data_chunks)
    # Retrieve and print the result from the engines
    asyncresult.wait_interactive()
    # retrieve actual results
    result = asyncresult.get()
    # get and print the results
    for i, res in enumerate(result):
        np.array(res)
        print(f"{i} : {res.shape}")
    final = sigma_iqr_estimator.finalize(centroids=result)
    print(final)

Here we have a functional version of the cell above, but with the ability to run any metric/data_chunk

In [None]:
def run_parallel_metric(estimator, data_chunks):
    with ipp.Cluster(controller_ip="*", engines="mpi", n=4) as rc:
        # get a broadcast_view on the cluster which is best
        # suited for MPI style computation
        view = rc.load_balanced_view()
        # run the mpi_example function on all engines in parallel
        asyncresult = view.map_async(mpi_example, data_chunks)
        # Retrieve and print the result from the engines
        asyncresult.wait_interactive()
        # retrieve actual results
        result = asyncresult.get()
        # get and print the results
        for i, res in enumerate(result):
            np.array(res)
            print(f"{i} : {res.shape}")
        final = estimator.finalize(centroids=result)
        print(final)

Repeating the results of the non-functional parallel PointSigmaIQR run.

Plus a comparison against the direct implementation of PointSigmaIQR

In [None]:
PointSigmaIQR().evaluate(estimate, reference)

In [None]:
run_parallel_metric(PointSigmaIQR(), iqr_data_chunks)

An example running the PointBias metric in directly and in parallel

In [None]:
PointBias().evaluate(estimate, reference)

In [None]:
run_parallel_metric(PointBias(), point_bias_data_chunks)