# <span style="color:blue">Target: </span>
## <span style="color:blue">Combine Xarray and Dask for the postprocessing of climate model output</span>

## Xarray
<img src="images/dataset-diagram-logo.png" 
     align="right"
     width="60%"
     alt="Xarray Dataset">
*  **NumPy-like multidimensional arrays**
*  **Enable labels**
*  **Preserve attributes**

In [None]:
import xarray as xr

**what does a xarray look like?**

In [None]:
file="/depot/huberm/data/kong/archive/CORDEX-EAS/PhaseII/historical/1h/tas/tas_EAS-22_MOHC-HadGEM2-ES_historical_r1i1p1_GERICS-REMO2015_v1_1hr_2000-2000.nc"
tas=xr.open_dataset(file)
#xarray.dataset: can contain mulitple data variables
tas

In [None]:
#xarray.DataArray: only one data variable: it's good to do calculations with xarray.DataArray
tas.tas

### Why label is useful?

**indexing by label**

In [None]:
tas.tas.sel(time="2000-01-01T01:00:00").max(["rlat","rlon"]) #spatial maximum of one time-step
#tas.sel(time=slice("2000-01-01","2000-01-02"))

**you can still do indexing by index with isel**

In [None]:
#tas.isel(time=[0])
tas.isel(time=slice(None,3))

**Apply operations over dimensions by name:**

In [None]:
tas.tas.mean('time') #time average (annual avearage in this case) distribution

**array broadcasting based on dimension names; never need to reshape**

In [None]:
tas.tas-tas.tas*2

**split-apply-combine paradigm with groupby: easier to calculate monthly, seasonal, daily, zonal, or longitudinal statistics!**

In [None]:
tas.tas.groupby('time.month').mean() #daily average pattern

In [None]:
import matplotlib.pyplot as plt
tas.tas.mean('time').plot(robust=True)

## Xarray documentation
http://xarray.pydata.org/en/stable/index.html

# Dask
<img src="images/dask-horizontal.svg" 
     align="right"
     width="40%"
     alt="Dask arrays are blocked numpy arrays">

Provides multi-core and distributed ***parallel*** execution on ***larger-than-memory*** datasets.

*  **High level collections:**  Dask Array mimics NumPy arrays but can operate in parallel on datasets that don't fit into memory.
   
*  **Low Level schedulers:** Task schedulers that execute task graphs in parallel. These execution engines power the high-level collections mentioned above but can also power custom, user-defined workloads.  

## Dask Array
<img src="images/dask-array-black-text.svg" 
     align="right"
     width="60%"
     alt="Dask arrays are blocked numpy arrays">
Dask arrays coordinate many Numpy arrays, arranged into chunks within a grid.  They support a large subset of the Numpy API.

In [None]:
import dask
import dask.array as da

**what does a dask array look like?**

In [None]:
file='/depot/huberm/data/kong/archive/CORDEX-EAS/PhaseII/historical/1h/hurs/hurs_EAS-22_MOHC-HadGEM2-ES_historical_r1i1p1_GERICS-REMO2015_v1_1hr_2000-2000.nc'
hurs=xr.open_dataset(file, chunks={"time": 1000})
hurs

**we are actually viewing a dask array within xarray structure; *.data* will let us access dask array directly**

In [None]:
hurs.hurs.data

**let's compare the performance of doing calculations on xarray and dask array (within xarry structure)**

*we expect dask array to be faster thanks to parallel computing*

In [None]:
%%time 
tas.tas.mean('time')

In [None]:
%%time 
hurs.hurs.mean('time').compute()

**why it's even slower?**
**let's try both of them again!**

In [None]:
%%time 
tas.tas.mean('rlat')

In [None]:
%%time 
hurs.hurs.mean('rlat').compute()

both xarray and dask array are faster at the second time. That's because both xarray and dask array are lazy loading. Xarray won't load array values into memory until you perform actual computation (you can manipulate, slice and subset xarray without loading into memory). In fact Dask is even lazier; it won't load anything until we explicitely call .compute()


if memeory is not a problem (for example when you are using a cluster), you can load your data first (*dataset.load()* for xarry; *dask.array.persist()* for Dask) which will distribute your data into cluster memory. Afterwards, computation would be faster!

Dask performs worse at the first try, since we need to load the data chunk by chunk

However, dask didn't show better performance even at the second try which probably indicate that the extra overhead of using dask makes it not suitable for this *small* problem


In [None]:
tas.persist()

**The power of xarray and dask can be well combined**

In [None]:
tas=tas.chunk({"time": 500}) #convert tas to dask array
tas_by_season = tas.tas.groupby('time.season').mean('time')
tas_range = abs(tas_by_season.sel(season='JJA')- tas_by_season.sel(season='DJF'))
%time tas_range.load()

# Delayed
parallelize custom algorithms using the simpler dask.delayed interface

In [None]:
import time
def inc(x):
    time.sleep(1)
    return x + 1

def double(x):
    time.sleep(1)
    return x * 2

def add(x, y):
    time.sleep(1)
    return x + y


In [None]:
%%time
data = [1, 2, 3]
output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)
total = sum(output)
print(total)

In [None]:
%%time
data = [1, 2, 3]
output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)
    output.append(c)
total = dask.delayed(sum)(output)
total.compute()

In [None]:
total.visualize() #dask is drawing this graph when the actual computation is delayed. 

# dask.delayed best practice:
https://docs.dask.org/en/latest/delayed-best-practices.html

### Don’t call dask.delayed on Dask Arrays
When you place a Dask array into a delayed call, that function will receive the NumPy equivalent. Beware that if your array is large, then this might crash your workers.
### Use *da.map_blocks*, *xr.apply_ufunc*, or *xr.map_blocks* instead to apply customized function to dask arrays chunk by chunk

In [None]:
def viscosity(tas):
    omega = (tas / 97 - 2.9) / 0.4 * (-0.034) + 1.048
    viscosity = 0.0000026693 * ((28.97 * tas)**0.5) / (3.617**2 * omega)
    return viscosity

v1=da.map_blocks(viscosity,tas.tas.data)
v2=xr.apply_ufunc(viscosity,tas.tas,dask="parallelized",output_dtypes=[float])
v3=xr.map_blocks(viscosity,tas.tas)
v4=viscosity(tas.tas)

In [None]:
%time v1.compute()

In [None]:
%time v2=v2.compute()

In [None]:
%time v3=v3.compute()

In [None]:
%time v4=v4.compute()

# Best practice of Dask
https://docs.dask.org/en/latest/best-practices.html

# Scheduler
* **Single machine scheduler:** Default scheduler, can only be used on a single machine. If you import Dask, set up a computation, and then call compute, then you will use the single-machine scheduler by default. ***We use single-machine scheduler above by default!***


* **Distributed scheduler:** can run on a single machine or distributed across a cluster, **should be preferred even on a single machine** (offer more features like diagnostic). To use the dask.distributed scheduler you must set up a Client
```python
from dask.distributed import Client
client = Client(...) 
df.x.sum().compute()
```

In [None]:
from dask.distributed import Client
client=Client()#sets up a scheduler in your local process and several processes running single-threaded Workers.
client

In [None]:
#tas=tas.chunk({"time": 1000})
v=da.map_blocks(viscosity,tas.tas.data)

In [None]:
%time v.compute()

In [None]:
client.close()
client

In [None]:
# single process with multiple threads
client = Client(processes=False)
client

In [None]:
v=da.map_blocks(viscosity,tas.tas.data)
%time v.compute()

single process with multiple threads works better for numeric computations like numpy who releasing GIL. But for other calculations don't release GIL, the parallel capability of multi-thread disappear! Use multi-process instead.

In [None]:
client.close()
client

In [None]:
#multiple processes each with single thread
client = Client(n_workers=2,threads_per_worker=12) 
client

In [None]:
v=da.map_blocks(viscosity,tas.tas.data)
%time v.compute()

In [None]:
#multiple processes each with single thread
client = Client(n_workers=24,threads_per_worker=1) 
client

In [None]:
%time v.compute()

each process has its own memory which solve the GIL issue. But, the memory of each process may be not enough for a big chunk size.

# Distributed across cluster: multiple nodes

### The preferred way to run Dask on HPC systems is to use *dask-jobqueue*. 

In [None]:
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
    queue="huberm",
    cores=24,
    processes=1,
    local_directory='/scratch/brown/kong97/tmp',
    project="huberm",
    memory="80 GB",
    walltime="01:00:00",
    interface='ib0'
)

In [None]:
cluster.scale(jobs=2)

In [None]:
print(cluster.job_script())

In [None]:
client = Client(cluster)
client

In [None]:
client

In [None]:
tas_rechunk2 = tas.chunk({"time":100})
v_rechunk2=da.map_blocks(viscosity,tas_rechunk2.tas.data)

In [None]:
%time v_rechunk2.compute()

In [None]:
!ls /depot/huberm/data/kong/archive/CORDEX-EAS/PhaseII/historical/1h/sfcwind/

In [None]:
sfcwind=xr.open_mfdataset('/depot/huberm/data/kong/archive/CORDEX-EAS/PhaseII/historical/1h/sfcwind/*MOHC-HadGEM2-ES*.nc', parallel=True)
sfcwind.sfcWind

In [None]:
sfcwind.sfcWind.chunks

# Numba: *@njit @vectorize*
### *@njit decorator*: compile your python code into machine code just-in-time

### *@vectorize*: Using @vectorize, you write your function as operating over input scalars, rather than arrays. Numba will generate the surrounding loop (or kernel) allowing efficient iteration over the actual inputs.


In [None]:
from numba import njit, vectorize
import numpy as np

In [None]:
def f1(x,y):
    return x*x+np.exp(y)-3

In [None]:
@njit
def f2(x,y):
    return x*x+np.exp(y)-3

In [None]:
a=np.random.randn(1000, 1000,100)
b=np.random.randn(1000, 1000,100)

In [None]:
%time f1(a,b)

In [None]:
%time f2(a,b)

In [None]:
a=np.random.randn(1000, 1000,10)
b=np.random.randn(1000, 1000,10)

In [None]:
def f3(x,y):
    z=np.zeros((1000,1000,10))
    for i in np.arange(0, a.shape[0]):
        for j in np.arange(0, a.shape[1]):
            for k in np.arange(0, a.shape[2]):
                z[i,j,k]=x[i,j,k]+y[i,j,k]
    return z

In [None]:
@njit
def f4(x,y):
    z=np.zeros((1000,1000,10))
    for i in np.arange(0, a.shape[0]):
        for j in np.arange(0, a.shape[1]):
            for k in np.arange(0, a.shape[2]):
                z[i,j,k]=x[i,j,k]+y[i,j,k]
    return z

In [None]:
%time f3(a,b)

In [None]:
%time f4(a,b)

In [None]:
def g1(x,y):
    if x>y:
        return x+y
    else:
        return x-y
@vectorize(nopython=True)
def g2(x,y):
    if x>y:
        return x+y
    else:
        return x-y
g3=np.vectorize(g1)

In [None]:
%time g3(a,b)

In [None]:
%time g2(a,b)