### Parallel Computation of Measures 

This notebook provides an example of setting up a PBSCluster and running applying a measure function on the list of MHWs in events data array.

#### Setting Up Cluster

In [1]:
pip install dask-jobqueue --quiet

Note: you may need to restart the kernel to use updated packages.


In [39]:
pip install xarray --quiet

Note: you may need to restart the kernel to use updated packages.


In [43]:
# pip install netcdf4

In [1]:
import dask

In [2]:
from dask_jobqueue import PBSCluster

In [3]:
from dask.distributed import Client

In [4]:
from dask import delayed

In [5]:
import numpy as np
import xarray as xr

In [6]:
# Setup your PBSCluster
cluster = PBSCluster(
    cores=4, # The number of cores you want
    memory='32GB', # Amount of memory
    processes=1, # How many processes
    queue='casper', # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
    local_directory='$TMPDIR', # Use your local directory
    resource_spec='select=1:ncpus=6:mem=32GB', # Specify resources
    project='UWIS0040', # Input your project ID here
    walltime='02:00:00', # Amount of wall time
    interface='ib0', # Interface to use
)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39216 instead


In [7]:
# Scale up
cluster.scale(4)



In [8]:
cluster

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

In [9]:
# Change your url to the dask dashboard so you can see it
dask.config.set({'distributed.dashboard.link':'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'})

# Setup your client
client = Client(cluster)

#### Reading Data

In [10]:
data_path = '/glade/u/home/cassiacai/marine_heatwaves/notebooks/SSTA_and_events_0_3.nc'

In [11]:
# read as dask array when chunks are specified
events = xr.open_dataset(data_path, chunks={'lat': -1, 'lon': -1, 'time': 20})

In [12]:
# read in ram
# events = xr.open_dataset(data_path)

In [13]:
type(events.SSTA.data)

dask.array.core.Array

In [31]:
events.SSTA.data

Unnamed: 0,Array,Chunk
Bytes,835.31 MiB,8.44 MiB
Shape,"(1980, 192, 288)","(20, 192, 288)"
Count,99 Tasks,99 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 835.31 MiB 8.44 MiB Shape (1980, 192, 288) (20, 192, 288) Count 99 Tasks 99 Chunks Type float64 numpy.ndarray",288  192  1980,

Unnamed: 0,Array,Chunk
Bytes,835.31 MiB,8.44 MiB
Shape,"(1980, 192, 288)","(20, 192, 288)"
Count,99 Tasks,99 Chunks
Type,float64,numpy.ndarray


In [14]:
events.chunks

Frozen({'time': (20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20), 'lat': (192,), 'lon': (288,)})

In [15]:
# we will persist the data on the cluster so we do not have to read it multiple times
events = events.persist()

Test with a simple function which returns a scalar output.

In [16]:
@dask.delayed
def calc_cumulativeintensity(event_file, mhw_id):
    # for_one_mhw = event_file.where(event_file.labels==mhw_id, drop=True)
    # cumulative_intensity = np.nansum(for_one_mhw.SSTA)
    cumulative_intensity = np.nansum(event_file.where(event_file.labels==mhw_id, drop=True).SSTA)
    #cumulative_intensity_monthly = for_one_mhw.SSTA.sum(axis=(1,2)).values
    return cumulative_intensity

In [17]:
# obtaining the labels ids (they are 0-1203 so we do not compute them every time)
# mhw_ids = np.unique(events.labels.data)
# mhw_ids = np.unique(events.labels.data).compute()

In [18]:
# np.nanmax(mhw_ids)

In [19]:
# output = calc_cumulativeintensity(events, mhw_ids[0])

In [20]:
# output.compute()

In [21]:
# events_scattered = client.scatter(events)

Create a list of delayed objects for every heatwave:

In [22]:
# measures = [calc_cumulativeintensity(events, i) for i in mhw_ids[:50]]

In [23]:
measures = [calc_cumulativeintensity(events, i) for i in range(1203)]

In [24]:
# measures

In [25]:
%%time
output = dask.compute(measures)

CPU times: user 53.6 s, sys: 1.83 s, total: 55.4 s
Wall time: 2min 21s


In [29]:
len(output[0])

1203