<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#ESGF-Distributed-array-example" data-toc-modified-id="ESGF-Distributed-array-example-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>ESGF Distributed array example</a></span><ul class="toc-item"><li><span><a href="#Import-CDMS-and-Dask-environment" data-toc-modified-id="Import-CDMS-and-Dask-environment-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Import CDMS and Dask environment</a></span></li><li><span><a href="#Create-a-Cluster" data-toc-modified-id="Create-a-Cluster-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Create a Cluster</a></span></li><li><span><a href="#Read-salinity-data-from-CMIP5" data-toc-modified-id="Read-salinity-data-from-CMIP5-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Read salinity data from CMIP5</a></span></li><li><span><a href="#Convert-cdms-variable-into-a-dask_array-variable" data-toc-modified-id="Convert-cdms-variable-into-a-dask_array-variable-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Convert cdms variable into a dask_array variable</a></span></li><li><span><a href="#Compute-the-mean" data-toc-modified-id="Compute-the-mean-1.5"><span class="toc-item-num">1.5&nbsp;&nbsp;</span>Compute the mean</a></span></li><li><span><a href="#Display-the-resut" data-toc-modified-id="Display-the-resut-1.6"><span class="toc-item-num">1.6&nbsp;&nbsp;</span>Display the resut</a></span></li><li><span><a href="#Create-colormap" data-toc-modified-id="Create-colormap-1.7"><span class="toc-item-num">1.7&nbsp;&nbsp;</span>Create colormap</a></span></li><li><span><a href="#Convert-results-back-to-CDMS" data-toc-modified-id="Convert-results-back-to-CDMS-1.8"><span class="toc-item-num">1.8&nbsp;&nbsp;</span>Convert results back to CDMS</a></span></li><li><span><a href="#Plot-result" data-toc-modified-id="Plot-result-1.9"><span class="toc-item-num">1.9&nbsp;&nbsp;</span>Plot result</a></span></li></ul></li><li><span><a href="#Numba-Example" data-toc-modified-id="Numba-Example-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Numba Example</a></span><ul class="toc-item"><li><span><a href="#Serial" data-toc-modified-id="Serial-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Serial</a></span></li><li><span><a href="#Parallel" data-toc-modified-id="Parallel-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>Parallel</a></span></li></ul></li><li><span><a href="#Run-reductions" data-toc-modified-id="Run-reductions-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Run reductions</a></span><ul class="toc-item"><li><span><a href="#First-Run-Serial-reduction" data-toc-modified-id="First-Run-Serial-reduction-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>First Run Serial reduction</a></span></li><li><span><a href="#Second-Run-paralell-reduction" data-toc-modified-id="Second-Run-paralell-reduction-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Second Run paralell reduction</a></span></li></ul></li></ul></div>

# ESGF Distributed array example

## Import CDMS and Dask environment

In [None]:
import cdms2
import numpy as np
import dask.array.ma as dma
import dask.array as da
import dask
from dask_kubernetes import KubeCluster
from dask.distributed import LocalCluster
from dask.distributed  import Client

## Create a Cluster
* 5 workers
* 2 threads

In [None]:
cluster=LocalCluster(n_workers=5,threads_per_worker=2)
display(cluster)
c=Client(cluster)
c

## Read salinity data from CMIP5

In [None]:
!wget http://cdat.llnl.gov/cdat/sample_data/so_Omon_ACCESS1-0_historical_r1i1p1_185001-185412_2timesteps.nc

In [None]:
#f=cdms2.open("ta_day_CNRM-CM6-1_piControl_r1i1p1f2_gr_18500101-18991231.nc")
# f=cdms2.open("ta_day_CNRM-CM6-1_piControl_r1i1p1f2_gr.xml")
f=cdms2.open("so_Omon_ACCESS1-0_historical_r1i1p1_185001-185412_2timesteps.nc")
dataDV=f['so']
print(dataDV.shape)
dataDV


## Convert cdms variable into a dask_array variable

`cdms` includes a serializer/deserializer function that is registered to dask

In [None]:
# dmaTV=dma.masked_array(dataTV)

daDV=da.from_array(dataDV,chunks=(1,1,300,360),asarray=False)
daDV

## Compute the mean
The mean is computed on each data point along the time axis.

In [None]:
%time result = da.mean(daDV, axis=0).compute()
print(result.shape)
result



## Display the resut

`VCS` is a package which facilitates visualization for Climate data

In [None]:
import vcs
canvas=vcs.init()

## Create colormap

Salinity between 30psu and 40psu will be rendered

In [None]:
cmap = vcs.createcolormap("my_colormap9", "rainbow")
box = vcs.createboxfill()
box.level_1=30
box.level_2=40

## Convert results back to CDMS
The result is a numpy masked array, we need to add back the grid and the attributes

(This will be done automatically in the next release.)

In [None]:
axes=dataDV.getAxisList()
resultsCDMS=cdms2.createVariable(result[1,:],axes=axes[2:])
[resultsCDMS.setattribute(name, dataDV.getattribute(name)) for name in dataDV.listattributes() if name not in ['ndim']]
resultsCDMS.id='so'

## Plot result

In [None]:
canvas.clear()
canvas.setcolormap(cmap)
canvas.plot(resultsCDMS,box, long_name="Ocean Salinity", units="psu")

# Numba Example

## Serial 

In [None]:
import numpy as np
from numba import njit, prange
def two_d_array_reduction_prod_serial(n):
    shp = (13, 17)
    result1 = 2 * np.ones(shp, np.int_)
    tmp = 2 * np.ones_like(result1)
    for i in prange(n):
        result1 *= tmp
    return result1


## Parallel

In [None]:
@njit(parallel=True)
def two_d_array_reduction_prod_parallel(n):
    shp = (13, 17)
    result1 = 2 * np.ones(shp, np.int_)
    tmp = 2 * np.ones_like(result1)
    for i in prange(n):
        result1 *= tmp
    return result1


# Run reductions

## First Run Serial reduction

In [None]:
import time
stime = time.time()
%time two_d_array_reduction_prod_serial(5000000)
etime = time.time()
durationSerial = etime - stime
print("duration:",  durationSerial)

## Second Run paralell reduction

In [None]:
import time
# compile first
two_d_array_reduction_prod_parallel(1)
stime = time.time()
%time two_d_array_reduction_prod_parallel(5000000)
etime = time.time()
durationPar = etime - stime
print("duration:", durationPar )
print(((durationSerial-durationPar)/durationSerial)*100,"% increase")

In [None]:
import numpy.ma
import numpy
import dask
import dask.array.ma
from dask.distributed import Client
from distributed.deploy.local import LocalCluster

rand_data= numpy.random.random((20,18,365))
ma_rand_data= numpy.ma.MaskedArray(rand_data,chunks=(1,18,365))
da_rand_data = dask.array.ma.masked_array(ma_rand_data)
print(da_rand_data)
dask.array.average(da_rand_data).compute()