In [1]:
%matplotlib inline
import intake
import xarray as xr
import matplotlib
import pandas as pd
import cartopy.crs as ccrs
import seaborn as sns
from esio import ice_plot
import dask

In [2]:
def test_plot(ds_SIT):
    ''' test spatial plot of SIT. Mean over all time dims and model dims.'''
    cmap_sic = matplotlib.colors.ListedColormap(sns.color_palette("Blues_r", 10))
    cmap_sic.set_bad(color = 'lightgrey')

    (f, ax1) = ice_plot.polar_axis()
    f.set_size_inches(10, 5)

    da = ds_SIT.mean_25km_1km.mean(dim=['model','init_end','fore_time']) 
    da.plot.pcolormesh(ax=ax1, x='lon', y='lat', 
                                         transform=ccrs.PlateCarree(),
                                         cmap=cmap_sic,
                                 cbar_kwargs={'label':'Sea Ice Thickness (m)'})

### Default schedular 

In [None]:
ds_SIT = intake.Catalog('./catalog.yaml').SIPN2_SIT.to_dask()
# ds_SIT = ds_SIT.chunk({'fore_time': 72, 'init_end': 56, 'model': 6, 'x': 304, 'y': 448})

In [None]:
%time ds_SIT.mean_25km_1km.mean().values

In [None]:
%time test_plot(ds_SIT)

### Multi thread

In [3]:
dask.config.set(scheduler='threads')

<dask.config.set at 0x7ff7e8fd2e48>

In [4]:
ds_SIT = intake.Catalog('./catalog.yaml').SIPN2_SIT.to_dask()
ds_SIT

<xarray.Dataset>
Dimensions:        (fore_time: 72, init_end: 56, model: 6, x: 304, y: 448)
Coordinates:
  * fore_time      (fore_time) timedelta64[ns] 0 days 1 days 2 days 3 days ...
  * init_end       (init_end) datetime64[ns] 2018-02-01 2018-02-06 ...
    lat            (x, y) float64 dask.array<shape=(304, 448), chunksize=(152, 224)>
    lon            (x, y) float64 dask.array<shape=(304, 448), chunksize=(152, 224)>
  * model          (model) object 'Observed' 'gfdlsipn' 'rasmesrl' ...
Dimensions without coordinates: x, y
Data variables:
    mean_25km_1km  (init_end, model, fore_time, x, y) float64 dask.array<shape=(56, 6, 72, 304, 448), chunksize=(1, 1, 1, 304, 448)>

In [None]:
%time ds_SIT.mean_25km_1km.mean().values

In [None]:
%time test_plot(ds_SIT)

### Single Machine

In [None]:
from dask.distributed import Client
client = Client()
client

In [None]:
ds_SIT = intake.Catalog('./catalog.yaml').SIPN2_SIT.to_dask()
ds_SIT

In [None]:
%time test_plot(ds_SIT)

### Distributed Cluster

In [None]:
from dask.distributed import Client, progress
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=20)
cluster

In [None]:
client = Client(cluster)
client

In [None]:
ds_SIT = intake.Catalog('./catalog.yaml').SIPN2_SIT.to_dask()
ds_SIT

In [None]:
%time test_plot(ds_SIT)