In [1]:
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import xarray as xr
import cartopy.crs as ccrs
import os
import time
from glob import glob

mpl.rcParams['figure.figsize'] = 12, 8
mpl.rcParams['font.size'] = 18

## In this notebook, I'll show a simple chunking example.

### Chunking allows python to operate on an array across multiple CPUs in parallel, by breaking it up into "chunks" that can be operated on independently. 

### There are a few rules of thumb for setting chunks, and using Dask with xarray in general, here:http://xarray.pydata.org/en/stable/dask.html

### Lets start a dask cluster with 10 cpus

In [2]:
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(project='UWAS0052')
cluster.scale(10)

from dask.distributed import Client
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39983 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: tcp://10.12.205.30:45509  Dashboard: http://10.12.205.30:39983/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


### Next I'll load some example data from the CESM LENS to play with. 

The preprocess function is to get around some weird problem with the LENS latitudes, which don't match exactly across ensemble members and so load wrong.

In [5]:
def preprocess(ds):
    ds = ds.drop('lat')
    return ds

In [6]:
ddir = '/glade/collections/cdg/data/CLIVAR_LE/cesm_lens/Amon/tas/'
dfiles = sorted(glob(ddir + '*rcp85*.nc'))

mfds = xr.open_mfdataset(dfiles,
                         combine='nested',
                         concat_dim='ensmem',
                         preprocess=preprocess,
                         parallel=True)

ds = xr.open_dataset(dfiles[0])
lat = ds.lat

mfds = mfds.assign_coords({'lat': lat})
mfds

Unnamed: 0,Array,Chunk
Bytes,122.88 kB,3.07 kB
Shape,"(40, 192, 2)","(1, 192, 2)"
Count,160 Tasks,40 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 122.88 kB 3.07 kB Shape (40, 192, 2) (1, 192, 2) Count 160 Tasks 40 Chunks Type float64 numpy.ndarray",2  192  40,

Unnamed: 0,Array,Chunk
Bytes,122.88 kB,3.07 kB
Shape,"(40, 192, 2)","(1, 192, 2)"
Count,160 Tasks,40 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,184.32 kB,4.61 kB
Shape,"(40, 288, 2)","(1, 288, 2)"
Count,160 Tasks,40 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 184.32 kB 4.61 kB Shape (40, 288, 2) (1, 288, 2) Count 160 Tasks 40 Chunks Type float64 numpy.ndarray",2  288  40,

Unnamed: 0,Array,Chunk
Bytes,184.32 kB,4.61 kB
Shape,"(40, 288, 2)","(1, 288, 2)"
Count,160 Tasks,40 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,26.65 GB,666.21 MB
Shape,"(40, 3012, 192, 288)","(1, 3012, 192, 288)"
Count,247 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 26.65 GB 666.21 MB Shape (40, 3012, 192, 288) (1, 3012, 192, 288) Count 247 Tasks 40 Chunks Type float32 numpy.ndarray",40  1  288  192  3012,

Unnamed: 0,Array,Chunk
Bytes,26.65 GB,666.21 MB
Shape,"(40, 3012, 192, 288)","(1, 3012, 192, 288)"
Count,247 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.93 MB,48.19 kB
Shape,"(40, 3012, 2)","(1, 3012, 2)"
Count,244 Tasks,40 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 1.93 MB 48.19 kB Shape (40, 3012, 2) (1, 3012, 2) Count 244 Tasks 40 Chunks Type object numpy.ndarray",2  3012  40,

Unnamed: 0,Array,Chunk
Bytes,1.93 MB,48.19 kB
Shape,"(40, 3012, 2)","(1, 3012, 2)"
Count,244 Tasks,40 Chunks
Type,object,numpy.ndarray


### Next I'll load surface air temperature, and subset to 20 years to make this a little less tedious. 

The `.persist()` method allows the data to be loaded into memory, but still keep it as a chunked dask array for parallel computation. If you are doing a bunch of computation on the same data, and it fits into memory, this is useful.

By default, this loads the data chunked across the `ensmem` dimension, so there are 40 chunks.

In [8]:
t = mfds['tas'].sel(time=slice('1980-01', '1999-12')).persist()
#t = t.chunk({'ensmem': 40, 'time': 12}).persist()
t

Unnamed: 0,Array,Chunk
Bytes,2.12 GB,53.08 MB
Shape,"(40, 240, 192, 288)","(1, 240, 192, 288)"
Count,40 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.12 GB 53.08 MB Shape (40, 240, 192, 288) (1, 240, 192, 288) Count 40 Tasks 40 Chunks Type float32 numpy.ndarray",40  1  288  192  240,

Unnamed: 0,Array,Chunk
Bytes,2.12 GB,53.08 MB
Shape,"(40, 240, 192, 288)","(1, 240, 192, 288)"
Count,40 Tasks,40 Chunks
Type,float32,numpy.ndarray


### Now lets compute the standard deviation across ensemble members, and time it. 

Dask has to do a lot of splitting and recombining of chunks, since it is computing the standard deviation across the chunks. this makes it slow, and it takes about 16 seconds.

In [9]:
start = time.time()
test = t.std('ensmem').compute()
end = time.time()

print(f'Elapsed time = {end-start}')

Elapsed time = 16.885666847229004


### Now I will rechunk the array across time, so that each chunk contains all ensemble members

In [10]:
t = t.chunk({'ensmem': 40, 'time': 12}).persist()
t

Unnamed: 0,Array,Chunk
Bytes,2.12 GB,106.17 MB
Shape,"(40, 240, 192, 288)","(40, 12, 192, 288)"
Count,20 Tasks,20 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.12 GB 106.17 MB Shape (40, 240, 192, 288) (40, 12, 192, 288) Count 20 Tasks 20 Chunks Type float32 numpy.ndarray",40  1  288  192  240,

Unnamed: 0,Array,Chunk
Bytes,2.12 GB,106.17 MB
Shape,"(40, 240, 192, 288)","(40, 12, 192, 288)"
Count,20 Tasks,20 Chunks
Type,float32,numpy.ndarray


### Now let's do the same computation. It is much faster.

In [11]:
start = time.time()
test = t.std('ensmem').compute()
end = time.time()

print(f'Elapsed time = {end-start}')

Elapsed time = 2.1835272312164307


### Finally I will close the cluster

In [14]:
cluster.close()

In [15]:
cluster = SLURMCluster(project='UWAS0052')
cluster.adapt(maximum_jobs=35)

client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 44541 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: tcp://10.12.205.30:46694  Dashboard: http://10.12.205.30:44541/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [17]:
ddir = '/glade/collections/cdg/data/CLIVAR_LE/mpi_lens/Amon/tas/'
dfiles = sorted(glob(ddir + '*rcp85*.nc'))
len(dfiles)

100

In [20]:
mfds = xr.open_mfdataset(dfiles, combine='nested', concat_dim='ensmem', parallel=True)
mfds

Unnamed: 0,Array,Chunk
Bytes,4.80 MB,48.00 kB
Shape,"(100, 3000, 2)","(1, 3000, 2)"
Count,400 Tasks,100 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 4.80 MB 48.00 kB Shape (100, 3000, 2) (1, 3000, 2) Count 400 Tasks 100 Chunks Type datetime64[ns] numpy.ndarray",2  3000  100,

Unnamed: 0,Array,Chunk
Bytes,4.80 MB,48.00 kB
Shape,"(100, 3000, 2)","(1, 3000, 2)"
Count,400 Tasks,100 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,307.20 kB,3.07 kB
Shape,"(100, 192, 2)","(1, 192, 2)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 307.20 kB 3.07 kB Shape (100, 192, 2) (1, 192, 2) Count 400 Tasks 100 Chunks Type float64 numpy.ndarray",2  192  100,

Unnamed: 0,Array,Chunk
Bytes,307.20 kB,3.07 kB
Shape,"(100, 192, 2)","(1, 192, 2)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,153.60 kB,1.54 kB
Shape,"(100, 96, 2)","(1, 96, 2)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 153.60 kB 1.54 kB Shape (100, 96, 2) (1, 96, 2) Count 400 Tasks 100 Chunks Type float64 numpy.ndarray",2  96  100,

Unnamed: 0,Array,Chunk
Bytes,153.60 kB,1.54 kB
Shape,"(100, 96, 2)","(1, 96, 2)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.12 GB,221.18 MB
Shape,"(100, 3000, 96, 192)","(1, 3000, 96, 192)"
Count,400 Tasks,100 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.12 GB 221.18 MB Shape (100, 3000, 96, 192) (1, 3000, 96, 192) Count 400 Tasks 100 Chunks Type float32 numpy.ndarray",100  1  192  96  3000,

Unnamed: 0,Array,Chunk
Bytes,22.12 GB,221.18 MB
Shape,"(100, 3000, 96, 192)","(1, 3000, 96, 192)"
Count,400 Tasks,100 Chunks
Type,float32,numpy.ndarray


In [34]:
t = mfds['tas'].chunk({'ensmem': 100, 'time': 100}).persist()
t

Unnamed: 0,Array,Chunk
Bytes,22.12 GB,737.28 MB
Shape,"(100, 3000, 96, 192)","(100, 100, 96, 192)"
Count,30 Tasks,30 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.12 GB 737.28 MB Shape (100, 3000, 96, 192) (100, 100, 96, 192) Count 30 Tasks 30 Chunks Type float32 numpy.ndarray",100  1  192  96  3000,

Unnamed: 0,Array,Chunk
Bytes,22.12 GB,737.28 MB
Shape,"(100, 3000, 96, 192)","(100, 100, 96, 192)"
Count,30 Tasks,30 Chunks
Type,float32,numpy.ndarray


In [None]:
start = time.time()
tmean = t.mean(('lon', 'ensmem')).compute()
end = time.time()

print(f'Elapsed time = {end-start}')

In [28]:
t

Unnamed: 0,Array,Chunk
Bytes,22.12 GB,221.18 MB
Shape,"(100, 3000, 96, 192)","(1, 3000, 96, 192)"
Count,400 Tasks,100 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.12 GB 221.18 MB Shape (100, 3000, 96, 192) (1, 3000, 96, 192) Count 400 Tasks 100 Chunks Type float32 numpy.ndarray",100  1  192  96  3000,

Unnamed: 0,Array,Chunk
Bytes,22.12 GB,221.18 MB
Shape,"(100, 3000, 96, 192)","(1, 3000, 96, 192)"
Count,400 Tasks,100 Chunks
Type,float32,numpy.ndarray
