# <center>Part 1: Creating the Dask Cluster</center>

# Library imports

In [1]:
import dask
import dask.array as da
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from climate.utils import load_dataset
from climate.kalman_filter import EnsembleKalmanFilter

  from distributed.utils import tmpfile


# Define Cluster
We here define the specifications for the individual workers in the cluster. In addition, we specify the queue for the jobs and the time allocated for each job (here two hours).

Note that this only defines the blueprint of the cluster, no actual resources are allocated here.

In [25]:
cluster = SLURMCluster(
    cores=4,                                                                
    memory="32 GB",                                                         
    death_timeout=6000,                                                                              
    walltime="02:00:00",                                                    
    job_extra=['--mem-per-cpu=12GB', '--qos="job_epyc2"', '--partition="epyc2"']
)

# Allocate Cluster 

We now create a client that will use the cluster. The *client* is the entity who will be in charge of submitting our computations to the cluster.

Finally, we allocate the resource by calling `client.scale(10)` which will allocate a cluster with 10 workers.

In [26]:
client = Client(cluster)
cluster.scale(10)

**Cluster Monitoring**: Note that the cluster automatically sets up an interactive monitoring dashboard available at port tcp/8787 of the scheduler. 

One can then set up a tunnel to that port of the scheduler to access the dashboard:

`ssh -N -f -L 8787:bnode034:8787 campusid@submit.unibe.ch`

Where `campusid` should be set to one's campus id login and `bnode034` should be set to the hostname of the scheduler (can be seen in the bash console of the scheduler).

In [16]:
cluster # We can also print cluster information from within the Jupyter console.
# Make sure to wait for the SLURM job scheduler to have time to allocate your workload.

0,1
Dashboard: http://10.1.3.15:8787/status,Workers: 12
Total threads: 12,Total memory: 89.40 GiB

0,1
Comm: tcp://10.1.3.15:15876,Workers: 12
Dashboard: http://10.1.3.15:8787/status,Total threads: 12
Started: 1 hour ago,Total memory: 89.40 GiB

0,1
Comm: tcp://10.1.3.4:31904,Total threads: 1
Dashboard: http://10.1.3.4:3496/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.4:26159,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-blmdeshl,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-blmdeshl

0,1
Comm: tcp://10.1.3.4:15547,Total threads: 1
Dashboard: http://10.1.3.4:33693/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.4:17924,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-6bbo2kf3,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-6bbo2kf3

0,1
Comm: tcp://10.1.3.4:32143,Total threads: 1
Dashboard: http://10.1.3.4:20693/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.4:26840,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-27a9wl9p,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-27a9wl9p

0,1
Comm: tcp://10.1.3.4:19337,Total threads: 1
Dashboard: http://10.1.3.4:5460/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.4:8301,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-fnx5teq0,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-fnx5teq0

0,1
Comm: tcp://10.1.3.20:4394,Total threads: 1
Dashboard: http://10.1.3.20:27068/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.20:8708,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-s8l5_v2b,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-s8l5_v2b

0,1
Comm: tcp://10.1.3.20:30410,Total threads: 1
Dashboard: http://10.1.3.20:10480/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.20:6382,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-et70tsv3,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-et70tsv3

0,1
Comm: tcp://10.1.3.20:22617,Total threads: 1
Dashboard: http://10.1.3.20:32061/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.20:15406,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-0w78b3p_,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-0w78b3p_

0,1
Comm: tcp://10.1.3.20:4149,Total threads: 1
Dashboard: http://10.1.3.20:13052/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.20:16658,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-8m74n1i5,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-8m74n1i5

0,1
Comm: tcp://10.1.3.3:29432,Total threads: 1
Dashboard: http://10.1.3.3:4965/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.3:28927,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-77d7f8qt,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-77d7f8qt

0,1
Comm: tcp://10.1.3.3:10856,Total threads: 1
Dashboard: http://10.1.3.3:30679/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.3:17877,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-diazwl_p,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-diazwl_p

0,1
Comm: tcp://10.1.3.3:16785,Total threads: 1
Dashboard: http://10.1.3.3:20728/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.3:19308,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-4btksc0p,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-4btksc0p

0,1
Comm: tcp://10.1.3.3:18098,Total threads: 1
Dashboard: http://10.1.3.3:30577/status,Memory: 7.45 GiB
Nanny: tcp://10.1.3.3:5836,
Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-487d033t,Local directory: /storage/homefs/ct19x463/Dev/Climate/examples/ubelix/dask-worker-space/worker-487d033t


# <center>Part 2: Loading the Data</center>

Data loading is performed by the `load_zarr_dataset` function. The user only has to specify the location of the folder containing the data. 

Note that due to the amount of data for the different ensemble members, reading the whole dataset can be slow on systems with slow disks (as is painfully the case in UBELIX). 
To that end, the use can choose to only load the first `n` ensemble members to run quick tests. The user can also set `ìgnore_members` to true, 
which will return a fake dataset that has the same size as the full one (basically, for each ensemble member we use the ensemble mean). 
This functionality is useful when one wants to work with objects that have the same size as the full dataset but which can be loaded faster.

In [5]:
base_folder = "/storage/homefs/ct19x463/Dev/Climate/Data/"
TOT_ENSEMBLES_NUMBER = 3
    
# The loading function returns 4 datasets: the ensemble members, the ensemble
# mean, the instrumental data and the reference dataset.
(dataset_mean, dataset_members,
        dataset_instrumental, dataset_reference,
        dataset_members_zarr)= load_dataset(
        base_folder, TOT_ENSEMBLES_NUMBER, ignore_members=False)
print("Loading done.")

/storage/homefs/ct19x463/Dev/Climate/Data/Ensembles/Members/member_1/
/storage/homefs/ct19x463/Dev/Climate/Data/Ensembles/Members/member_2/
/storage/homefs/ct19x463/Dev/Climate/Data/Ensembles/Members/member_3/


  sample = dates.ravel()[0]
  dataset_mean['time'] = dataset_mean.indexes['time'].to_datetimeindex()
  dataset_members['time'] = dataset_members.indexes['time'].to_datetimeindex()


Loading done.


Note that the data loading does not put any actual data into memory. Also, the datasets returned by this method are quite raw to work with.

There are wrappers to ease data access. The easiest way to wrap data into an accessible format is to let a KalmanFilter object handle that for us under the hood.

In [27]:
from climate.kalman_filter import EnsembleKalmanFilter                                                                                                                                                         
my_filter = EnsembleKalmanFilter(dataset_mean, dataset_members_zarr, dataset_instrumental, client)

Maximal distance to matched point: 120.54565778878536 km.


**Remark:** Note that in the background the Kalman filter load

One can then easily get data vectors for a given time period using the `get_window_vector` function of the wrapped datasets. 
This function returns data vectors with the latitude, longitude and time dimensions concatenated in some fixed order.

In [None]:
time_begin = '1961-01-16'
time_end = '1961-06-16'
my_filter.dataset_members.get_window_vector(time_begin, time_end)

Unnamed: 0,Array,Chunk
Bytes,12.66 MiB,39.06 kiB
Shape,"(30, 110592)","(1, 10000)"
Count,68070 Tasks,360 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 12.66 MiB 39.06 kiB Shape (30, 110592) (1, 10000) Count 68070 Tasks 360 Chunks Type float32 numpy.ndarray",110592  30,

Unnamed: 0,Array,Chunk
Bytes,12.66 MiB,39.06 kiB
Shape,"(30, 110592)","(1, 10000)"
Count,68070 Tasks,360 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,864.00 kiB,621.00 kiB
Shape,"(110592,)","(79488,)"
Count,130 Tasks,2 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 864.00 kiB 621.00 kiB Shape (110592,) (79488,) Count 130 Tasks 2 Chunks Type float64 numpy.ndarray",110592  1,

Unnamed: 0,Array,Chunk
Bytes,864.00 kiB,621.00 kiB
Shape,"(110592,)","(79488,)"
Count,130 Tasks,2 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,864.00 kiB,621.00 kiB
Shape,"(110592,)","(79488,)"
Count,130 Tasks,2 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 864.00 kiB 621.00 kiB Shape (110592,) (79488,) Count 130 Tasks 2 Chunks Type float64 numpy.ndarray",110592  1,

Unnamed: 0,Array,Chunk
Bytes,864.00 kiB,621.00 kiB
Shape,"(110592,)","(79488,)"
Count,130 Tasks,2 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,864.00 kiB,621.00 kiB
Shape,"(110592,)","(79488,)"
Count,130 Tasks,2 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 864.00 kiB 621.00 kiB Shape (110592,) (79488,) Count 130 Tasks 2 Chunks Type datetime64[ns] numpy.ndarray",110592  1,

Unnamed: 0,Array,Chunk
Bytes,864.00 kiB,621.00 kiB
Shape,"(110592,)","(79488,)"
Count,130 Tasks,2 Chunks
Type,datetime64[ns],numpy.ndarray


In [None]:
window_vector = client.scatter(my_filter.dataset_members.get_window_vector(time_begin, time_end))

# Getting the covariance is also simple.

In [28]:
cov = my_filter.get_ensemble_covariance(time_begin, time_end)
cov

  intermediate = blockwise(


Unnamed: 0,Array,Chunk
Bytes,45.56 GiB,381.47 MiB
Shape,"(110592, 110592)","(10000, 10000)"
Count,80010 Tasks,144 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 45.56 GiB 381.47 MiB Shape (110592, 110592) (10000, 10000) Count 80010 Tasks 144 Chunks Type float32 numpy.ndarray",110592  110592,

Unnamed: 0,Array,Chunk
Bytes,45.56 GiB,381.47 MiB
Shape,"(110592, 110592)","(10000, 10000)"
Count,80010 Tasks,144 Chunks
Type,float32,numpy.ndarray


One can trigger the computation of the covariance using `client.persist()`. This will keep the result distributed on the cluster. 
Note that full covariance computation takes less than 1 minute.

In [29]:
cov = client.persist(cov)

2022-03-30 11:54:33,613 - distributed.scheduler - ERROR - Error transitioning 'original-open_dataset-aeb882a396ff182f04673b7bc9d9ca23air_temperature-f7ea83e46dde6e2ffc915e453ffe60fa' from 'erred' to 'memory'
Traceback (most recent call last):
  File "/storage/homefs/ct19x463/envs/myenv/lib/python3.8/site-packages/distributed/scheduler.py", line 2321, in _transition
    assert not args and not kwargs, (args, kwargs, start_finish)
AssertionError: ((), {'worker': 'tcp://10.1.3.4:3242', 'nbytes': 48, 'typename': 'xarray.core.indexing.ImplicitToExplicitIndexingAdapter'}, ('erred', 'memory'))
2022-03-30 11:54:33,627 - distributed.utils - ERROR - ((), {'worker': 'tcp://10.1.3.4:3242', 'nbytes': 48, 'typename': 'xarray.core.indexing.ImplicitToExplicitIndexingAdapter'}, ('erred', 'memory'))
Traceback (most recent call last):
  File "/storage/homefs/ct19x463/envs/myenv/lib/python3.8/site-packages/distributed/utils.py", line 693, in log_errors
    yield
  File "/storage/homefs/ct19x463/envs/myenv

In [None]:
cov[10, 10].compute()

In [None]:
cluster

In [None]:
client.shutdown