# GCM Filters Scaling Benchmark

Run on Casper full node (36 cores)

In [1]:
import xarray as xr
xr.__version__

'0.17.0'

In [2]:
#from coiled import performance_report
from dask.distributed import performance_report

In [3]:
import dask
dask.__version__

'2021.04.1'

In [4]:
import numpy as np
np.__version__

'1.20.2'

In [5]:
import dask.array as dsa

In [6]:
import gcm_filters as gf
gf.__version__

'0.1'

In [7]:
from contextlib import contextmanager
import time
import pandas as pd

class DiagnosticTimer:
    def __init__(self):
        self.diagnostics = []

    @contextmanager
    def time(self, **kwargs):
        tic = time.time()
        yield
        toc = time.time()
        kwargs["runtime"] = toc - tic
        self.diagnostics.append(kwargs)

    @property
    def df(self):
        return pd.DataFrame(self.diagnostics)

In [8]:
def make_data(shape, chunks, gpu=False):
    nt, ny, nx = shape
    da = xr.DataArray(dsa.random.random(shape, chunks=chunks), dims=['time', 'y', 'x'])
    mask_data = dsa.ones((ny, nx))
    mask_data[(ny // 4):(3 * ny // 4), (nx // 4):(3 * nx // 4)] = 0
    wet_mask = xr.DataArray(mask_data, dims=['y', 'x'])
    
    da_masked = da.where(wet_mask)
    
    if gpu:
        raise NotImplementedError("Can't get cupy working :(")

    filter = gf.Filter(
        filter_scale=4,
        dx_min=1,
        filter_shape=gf.FilterShape.TAPER,
        grid_type=gf.GridType.REGULAR_WITH_LAND,
        grid_vars={'wet_mask': wet_mask}
    )
    filter

    da_filtered = filter.apply(da_masked, dims=['y', 'x'])
    return da_masked, da_filtered

In [9]:
from dask.distributed import Client, LocalCluster

In [9]:
import subprocess
import logging
from distributed import WorkerPlugin

class PipPlugin(WorkerPlugin):
    """
    Install packages on a worker as it starts up.

    Parameters
    ----------
    packages : List[str]
        A list of packages to install with pip on startup.
    """
    def __init__(self, packages):
        self.packages = packages

    def setup(self, worker):
        logger = logging.getLogger("distributed.worker")
        subprocess.call(['python', '-m', 'pip', 'install', '--upgrade'] + self.packages)
        logger.info("Installed %s", self.packages)

In [10]:
from dask_gateway import Gateway
gateway = Gateway()
cluster = gateway.new_cluster()
client = cluster.get_client()

# Now create and register the plugin. We'll install 'bulwark'
plugin = PipPlugin(['gcm-filters'])
client.register_worker_plugin(plugin)

{}

In [11]:
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [20]:
# weak scaling - problem size scales with number of cores

diag_timer_weak = DiagnosticTimer()
max_workers = 20
worker_step = 4
for nworkers in [1] + list(range(worker_step, max_workers + 1, worker_step)):
    cluster.scale(nworkers)
    client.wait_for_workers(nworkers)
    assert len(client.ncores()) == nworkers
    ncores = sum(client.ncores().values())

    shape = ncores * 20, 1024, 1024
    chunks = (2,) + shape[1:]
    unfiltered, filtered = make_data(shape, chunks)

    details = dict(ncores=ncores, nworkers=nworkers, shape=shape, chunks=chunks,
                   nbytes=filtered.data.nbytes, dtype=str(filtered.dtype))
    with diag_timer_weak.time(operation='unfiltered_mean', **details):
        unfiltered.data.mean().compute()
    with diag_timer_weak.time(operation='filtered_mean', **details):
        filtered.data.mean().compute()
    print(diag_timer_weak.df.iloc[-2:])

         operation  ncores  nworkers             shape           chunks  \
0  unfiltered_mean       2         1  (40, 1024, 1024)  (2, 1024, 1024)   
1    filtered_mean       2         1  (40, 1024, 1024)  (2, 1024, 1024)   

      nbytes    dtype    runtime  
0  335544320  float64   0.515410  
1  335544320  float64  17.958028  
         operation  ncores  nworkers              shape           chunks  \
2  unfiltered_mean       8         4  (160, 1024, 1024)  (2, 1024, 1024)   
3    filtered_mean       8         4  (160, 1024, 1024)  (2, 1024, 1024)   

       nbytes    dtype    runtime  
2  1342177280  float64   7.902542  
3  1342177280  float64  25.111938  
         operation  ncores  nworkers              shape           chunks  \
4  unfiltered_mean      16         8  (320, 1024, 1024)  (2, 1024, 1024)   
5    filtered_mean      16         8  (320, 1024, 1024)  (2, 1024, 1024)   

       nbytes    dtype    runtime  
4  2684354560  float64   8.220016  
5  2684354560  float64  24.8285

In [21]:
from datetime import datetime

now = datetime.now().isoformat()[:19]
diag_timer_weak.df.to_csv(f'data/scaling_weak_cloud_cpu_{now}.csv', index=False)