In [None]:
#%load_ext nb_black

In [7]:
#!pip install climpred

In [1]:
# Import some python libraries
%matplotlib inline

import numpy as np
import xarray as xr
import matplotlib.pyplot as plt

In [2]:
from dask_gateway import Gateway
from dask.distributed import Client

gateway = Gateway()
options = gateway.cluster_options()
options.worker_cores = 4
cluster = gateway.new_cluster(cluster_options=options)

cluster.scale(16) # scale up if more performance required
from time import sleep
sleep(10)

In [3]:
client = cluster.get_client()
client.restart()

0,1
Client  Scheduler: gateway://traefik-gcp-uscentral1b-prod-dask-gateway.prod:80/prod.bdc44f2594d64a2ca90cd6703ab41c58  Dashboard: /services/dask-gateway/clusters/prod.bdc44f2594d64a2ca90cd6703ab41c58/status,Cluster  Workers: 8  Cores: 32  Memory: 34.36 GB


---

# Introduction

This notebook demonstrates the performance of `climpred` on large datasets.
Here, we run `PredictionEnsemble.bootstrap` to calculate a pvalue that initialized skill surpasses uninitialized skill.

---

# `PerfectModelEnsemble`

## fake data

In [4]:
def gen_pm(degree=5, chunking_dim='lon'):
    ds = xr.Dataset()
    control = xr.Dataset()
    nmember = 10
    ninit = 12
    nlead = 5
    nx = 360//degree
    ny = 180//degree
    control_start = 3000
    control_end = 3300
    ntime = control_end - control_start

    times = xr.cftime_range(
        start=str(control_start),
        periods=ntime,
        freq='YS',
        calendar='noleap',
    )
    leads = np.arange(1, 1 + nlead)
    members = np.arange(1, 1 + nmember)
    inits = xr.cftime_range(
        start=str(control_start),
        periods=ninit,
        freq='10YS',
        calendar='noleap',
    )

    lons = xr.DataArray(
        np.linspace(0.5, 359.5, nx),
        dims=('lon',),
        attrs={'units': 'degrees east', 'long_name': 'longitude'},
    )
    lats = xr.DataArray(
        np.linspace(-89.5, 89.5, ny),
        dims=('lat',),
        attrs={'units': 'degrees north', 'long_name': 'latitude'},
    )
    ds['var'] = xr.DataArray(
                np.random.random(
                    (nmember, ninit, nlead, nx, ny),
                ),
                coords={
                    'member': members,
                    'init': inits,
                    'lon': lons,
                    'lat': lats,
                    'lead': leads,
                },
                dims=('member', 'init', 'lead', 'lon', 'lat'),
                name='var',)

    control['var'] = xr.DataArray(
                np.random.random((ntime, nx, ny)),
                coords={'lon': lons, 'lat': lats, 'time': times},
                dims=('time', 'lon', 'lat'),
                name='var',
                attrs={'units': 'var units', 'description': 'a description'},
            )
    control=control['var'].chunk({'time':'auto'})

    ds.lead.attrs['units'] = 'years'

    ds=ds['var']
    if chunking_dim is not None:
        ds=ds.chunk({chunking_dim:'auto'})
        control=control.chunk({chunking_dim:'auto'})

    return ds, control

ds, control = gen_pm()

## `verify()`

In [8]:
from climpred import PerfectModelEnsemble
pm = PerfectModelEnsemble(ds).add_control(control)

In [20]:
cp_kwargs = {'metric': 'mae', 'comparison':'m2e', 'dim':['init']}

In [10]:
pm.get_initialized()['var'].data

Unnamed: 0,Array,Chunk
Bytes,12.44 MB,12.44 MB
Shape,"(10, 12, 5, 72, 36)","(10, 12, 5, 72, 36)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 12.44 MB 12.44 MB Shape (10, 12, 5, 72, 36) (10, 12, 5, 72, 36) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",12  10  36  72  5,

Unnamed: 0,Array,Chunk
Bytes,12.44 MB,12.44 MB
Shape,"(10, 12, 5, 72, 36)","(10, 12, 5, 72, 36)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [21]:
%time skill = pm.verify(**cp_kwargs)
skill['var'].data

CPU times: user 216 ms, sys: 1.12 ms, total: 217 ms
Wall time: 354 ms


Unnamed: 0,Array,Chunk
Bytes,1.04 MB,1.04 MB
Shape,"(36, 5, 72, 10)","(36, 5, 72, 10)"
Count,109 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.04 MB 1.04 MB Shape (36, 5, 72, 10) (36, 5, 72, 10) Count 109 Tasks 1 Chunks Type float64 numpy.ndarray",36  1  10  72  5,

Unnamed: 0,Array,Chunk
Bytes,1.04 MB,1.04 MB
Shape,"(36, 5, 72, 10)","(36, 5, 72, 10)"
Count,109 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [None]:
%time skillc = skill.compute()

## `bootstrap()`

Bootstrap significant skill

In [24]:
iterations=100

In [30]:
%%time
bskill = pm.bootstrap(**cp_kwargs, iterations=iterations, reference='uninitialized')
bskill['var'].data

  f"Chunking might not bring parallelized performance increase, "


CPU times: user 2.71 s, sys: 247 ms, total: 2.95 s
Wall time: 3.57 s


Unnamed: 0,Array,Chunk
Bytes,829.44 kB,207.36 kB
Shape,"(4, 2, 36, 5, 72)","(2, 1, 36, 5, 72)"
Count,13080 Tasks,6 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 829.44 kB 207.36 kB Shape (4, 2, 36, 5, 72) (2, 1, 36, 5, 72) Count 13080 Tasks 6 Chunks Type float64 numpy.ndarray",2  4  72  5  36,

Unnamed: 0,Array,Chunk
Bytes,829.44 kB,207.36 kB
Shape,"(4, 2, 36, 5, 72)","(2, 1, 36, 5, 72)"
Count,13080 Tasks,6 Chunks
Type,float64,numpy.ndarray


In [None]:
improved_by_init = bskill.sel(results='p', skill='uninitialized') <= 0.05
%time improved_by_init_c = improved_by_init.compute()

# `HindcastEnsemble`

## fake data

In [33]:
def gen_hind(degree=5,nmember=34,nlead=10, chunking_dim='lon'):
    hind = xr.Dataset()
    observations = xr.Dataset()
    uninit=xr.Dataset()
    nx = 360//degree
    ny = 180//degree
    init_start = 1960
    init_end = 2015
    ninit = init_end - init_start

    times = xr.cftime_range(
        start=str(init_start),
        periods=ninit,
        freq='YS',
        calendar='noleap',
    )
    leads = np.arange(1, 1 + nlead)
    members = np.arange(1, 1 + nmember)
    inits = xr.cftime_range(
        start=str(init_start), end=str(init_end-1),
        freq='YS',
    )

    lons = xr.DataArray(
        np.linspace(0.5, 359.5, nx),
        dims=('lon',),
        attrs={'units': 'degrees east', 'long_name': 'longitude'},
    )
    lats = xr.DataArray(
        np.linspace(-89.5, 89.5, ny),
        dims=('lat',),
        attrs={'units': 'degrees north', 'long_name': 'latitude'},
    )
    hind['var'] = xr.DataArray(
                np.random.random(
                    (nmember, ninit, nlead, nx, ny),
                ),
                coords={
                    'member': members,
                    'init': inits,
                    'lon': lons,
                    'lat': lats,
                    'lead': leads,
                },
                dims=('member', 'init', 'lead', 'lon', 'lat'),
                name='var',)

    observations['var'] = xr.DataArray(
                np.random.random((ninit, nx, ny)),
                coords={'lon': lons, 'lat': lats, 'time': inits},
                dims=('time', 'lon', 'lat'),
                name='var',
                attrs={'units': 'var units', 'description': 'a description'},
            )
    uninit['var'] = xr.DataArray(
                np.random.random((ninit, nx, ny, nmember)),
                coords={'lon': lons, 'lat': lats, 'time': inits, 'member': members},
                dims=('time', 'lon', 'lat', 'member'),
                name='var')

    if chunking_dim is not None:
        observations=observations.chunk({chunking_dim:'auto'})
        uninit=uninit.chunk({chunking_dim:'auto'})
        hind=hind.chunk({chunking_dim:'auto'})
        hind=hind.chunk({'lon':-1})
        hind=hind.chunk({'lead':1})

    observations=observations['var']
    uninit=uninit['var']
    hind=hind['var']
    hind.lead.attrs['units'] = 'years'

    return hind,uninit,observations

hind,hist,obs = gen_hind()

## `verify()`

In [34]:
from climpred import HindcastEnsemble
hindcast = HindcastEnsemble(hind).add_uninitialized(hist).add_observations(obs)

In [35]:
cp_kwargs = {'metric': 'mae', 'comparison':'e2r', 'dim':'init', 'alignment':'same_verifs'}

In [36]:
hindcast.get_initialized()['var'].data

Unnamed: 0,Array,Chunk
Bytes,387.76 MB,38.78 MB
Shape,"(34, 55, 10, 72, 36)","(34, 55, 1, 72, 36)"
Count,24 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 387.76 MB 38.78 MB Shape (34, 55, 10, 72, 36) (34, 55, 1, 72, 36) Count 24 Tasks 10 Chunks Type float64 numpy.ndarray",55  34  36  72  10,

Unnamed: 0,Array,Chunk
Bytes,387.76 MB,38.78 MB
Shape,"(34, 55, 10, 72, 36)","(34, 55, 1, 72, 36)"
Count,24 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [38]:
%time skill = hindcast.verify(**cp_kwargs)
skill['var'].data

CPU times: user 554 ms, sys: 1.81 ms, total: 556 ms
Wall time: 634 ms


Unnamed: 0,Array,Chunk
Bytes,207.36 kB,20.74 kB
Shape,"(10, 36, 72)","(1, 36, 72)"
Count,149 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 207.36 kB 20.74 kB Shape (10, 36, 72) (1, 36, 72) Count 149 Tasks 10 Chunks Type float64 numpy.ndarray",72  36  10,

Unnamed: 0,Array,Chunk
Bytes,207.36 kB,20.74 kB
Shape,"(10, 36, 72)","(1, 36, 72)"
Count,149 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [91]:
%time skillc = skill.compute()

CPU times: user 653 ms, sys: 226 ms, total: 879 ms
Wall time: 375 ms


## `bootstrap()`
Bootstrap significant skill

In [39]:
iterations=100

In [None]:
%%time
bskill = hindcast.bootstrap(iterations=iterations, reference=['uninitialized'], **cp_kwargs)
bskill['var'].data

In [None]:
improved_by_init = bskill.sel(results='p',skill='uninitialized') <= 0.05
%time improved_by_init_c = improved_by_init.compute()

# Close down

In [23]:
client.close()
cluster.close()