# Tutorial: Moving from Single Jobs to Many Nodes: Dask, X-Array, and Pangeo, Part 2

This is the second of a two notebook series which introduces the reader to basic concepts related to moving basic xarray workflows from single-machine to many-machine systems. This material is adapted from the excellent tutorial developed by [Ryan Abernathey, Joe Hamman, and Scott Henderson from the AGU 2018 Fall Meeting](https://github.com/pangeo-data/pangeo-tutorial-agu-2018/).

--- 

Initial setup matter

In [None]:
%matplotlib inline
from matplotlib import pyplot as plt
plt.style.use(['seaborn-ticks', 'seaborn-talk'])

import numpy as np
import xarray as xr

## Towards many-core workflows

One of the tricks that we have when working with `xarray` is native integration with **dask**, a distributing computing library which features an array object implementing the standard NumPy API; `dask.array`s are basically large arrays composed of many smaller NumPy arrays:

<img src="http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg">

Setting up a proper Dask workflow system isn't very complicated, but it's beyond the scope of this mini-tutorial. The Pangeo community has put together [several sets of documentation on how you can deploy a Dask system on both traditional HPC/cluster setups as well as on the cloud](http://pangeo.io/setup_guides/index.html).

If you're running this notebook on Binder, we've already got you covered - we're using a tool called **dask-kubernetes**, which leans on a popular container orchestration tool for spinning up Dask workers as users need them. Normally, you can run the following code to set up and manually scale a **dask-kubernetes** `KubeCluster`:

``` python
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster
```

Due to an idiosyncrasy, we're going to manually make a cluster using the built-in Dask lab-extension:

1. Click the dask icon on the left-hand panel (the orange wings)
2. Click on "+ NEW"; this should create a "KubeCluster" in the panel underneath it
3. Note the *Scheduler Address* ("tcp://< ip >:< port >"); we'll use that below.
4. *Scale* the number of workers/cores on the cluster
5. Create Dask Status board panes that you'd like to monitor before continuing.
    
Now, we can connect a client to the `KubeCluster`; be sure to fill in the "ip" and "port" strings below. 

In [None]:
from dask.distributed import Client, progress

# If executing on Binder...
# client = Client('<ip>:<port>')

# ... else, if executing locally.
client = Client()
client

### Simple Dask array examples

The following calculations give a feel for how Dask distributes array calculations. We can create a Dask array just like we might a NumPy array. The major difference is that we specify the *chunks* that we want to break the array into.

In [None]:
import dask.array as da
x = da.random.random((20000, 20000), chunks=(2000, 2000)).persist()
x

Note that when we print the Dask array, we get a heuristic explaining what the variable contains... not the values of the array itself. This is because Dask *defers* its calculations until the user tells it to do so. Intead, Dask is tracking a graph representing the calculation as it is built, and will try to optimize it before executing it.

In [None]:
x[0, :5].compute()

In [None]:
x.sum().compute()

We can have dask execute relatively aribtrarily complex calculations for us:

In [None]:
y = x + x.T - x.mean(axis=0)
y = y.persist()

In [None]:
y[::5, ::5].compute()

### Dask and xarray

`xarray` has built-in functionality to lean on `dask`. Let's open our multi-file dataset one more time:

In [None]:
ds_all = xr.open_mfdataset('../data/sst/*.nc')
ds_all

We skipped over this point before, but look at the print out for the **sst** variable; note that it's not reporting any numerical vbalues, but instead is giving us a diagnostic similar to when we manually created Dask arrays earlier.

By default, when we execute **open_mfdataset()**, xarray will instruct dask to try contain each separate file's contents as an individual chunk. Our dataset has 57 annual cycles of data, each contained in a single file, so xarray/dask automatically chunks the data like:

    float dask.array<shape=(684, 89, 180), chunksize=(12, 89, 180)>

Let's complete weighted average calculation from before, but observe what happens now that we have a chunked dataset:

In [None]:
sst = ds_all.sst

Compute the grid cell areas like before.

In [None]:
R = 6.37e6
# we know already that the spacing of the points is one degree latitude
dϕ = np.deg2rad(1.)
dλ = np.deg2rad(1.)
dA = R**2 * dϕ * dλ * np.cos(np.deg2rad(ds.lat))

As a sanity check, plot the grid cell areas on a 2D image.

In [None]:
pixel_area = dA.where(sst[0].notnull())
pixel_area.plot()

Now, compute a timeseries of weighted global average SSTs.

In [None]:
total_ocean_area = pixel_area.sum(dim=('lon', 'lat'))
sst_weighted_mean = (sst * pixel_area).sum(dim=('lon', 'lat')) / total_ocean_area
sst_weighted_mean

In [None]:
sst_weighted_mean.plot()

Now we have 57 years of data instead of one!

### Groupby

Now that we have a bigger dataset, this is a good time to check out xarray's groupby capabilities.

In [None]:
sst_clim = ds_all.sst.groupby('time.month').mean(dim='time')
sst_clim

Now the data has dimension `month` instead of time!
Each value represents the average among all of the Januaries, Februaries, etc. in the dataset.

In [None]:
(sst_clim[6] - sst_clim[0]).plot()
plt.title('June minus July SST Climatology')

## Resample and Rolling

Resample is meant specifically to work with time data (data with a `datetime64` variable as a dimension).
It allows you to change the time-sampling frequency of your data.

Let's illustrate by selecting a single point.

In [None]:
sst_ts = ds_all.sst.sel(lon=300, lat=10)
sst_ts_annual = sst_ts.resample(time='A').mean(dim='time')
sst_ts_annual

In [None]:
sst_ts.plot()
sst_ts_annual.plot()

An alternative approach is a "running mean" over the time dimension.
This can be accomplished with xarray's `.rolling` operation.

In [None]:
sst_ts_rolling = sst_ts.rolling(time=24).mean(dim='time', centered=True)
sst_ts_annual.plot(marker='o')
sst_ts_rolling.plot()