# Compute on cal1


## open_mfdataset en parallel

from https://github.com/pydata/xarray/pull/1983

### Benchmark with a collection of 365 files containing NATL60 2d arrays, 1.2G per files

   * no parallel, no autoclose :

In [None]:
In [10]: %%time
    ...: with dask.set_options(get=dask.threaded.get):
    ...:     ds=xr.open_mfdataset('/mnt/meom/MODEL_SET/NATL60/NATL60-CJM165-S/1h/*/NATL60-CJM165_y201*.1h_SSH.nc', con
    ...: cat_dim='time_counter',chunks={'x': 1000, 'y':1000, 'time_counter': 1})
    ...: 
---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)


   * no parallel, with autoclose :

In [None]:
In [11]: %%time
    ...: with dask.set_options(get=dask.threaded.get):
    ...:     ds=xr.open_mfdataset('/mnt/meom/MODEL_SET/NATL60/NATL60-CJM165-S/1h/*/NATL60-CJM165_y201*.1h_SSH.nc', con
    ...: cat_dim='time_counter',chunks={'x': 1000, 'y':1000, 'time_counter': 1}, autoclose=True)
    ...: 
    ...: 
---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)


   * in parallel, with dask.multiprocessing get :

In [None]:
In [8]: %%time
   ...: with dask.set_options(get=dask.multiprocessing.get):
   ...:     ds=xr.open_mfdataset('/mnt/meom/MODEL_SET/NATL60/NATL60-CJM165-S/1h/*/NATL60-CJM165_y201*.1h_SSH.nc',autoc
   ...: lose=True, concat_dim='time_counter',chunks={'x': 1000, 'y':1000, 'time_counter': 1}, parallel=True)
   ...: 
CPU times: user 3min 35s, sys: 1min 10s, total: 4min 45s
Wall time: 5min 40s



   * in parallel, with cluster get :

In [None]:
In [9]: %%time
   ...: with dask.set_options(get=c.get):
   ...:     ds=xr.open_mfdataset('/mnt/meom/MODEL_SET/NATL60/NATL60-CJM165-S/1h/*/NATL60-CJM165_y201*.1h_SSH.nc',autoc
   ...: lose=True, concat_dim='time_counter',chunks={'x': 1000, 'y':1000, 'time_counter': 1}, parallel=True)
   ...: 
CPU times: user 27.8 s, sys: 5.06 s, total: 32.8 s
Wall time: 52.3 s


### The code

In [None]:
import xarray as xr
import dask
import dask.threaded
import dask.multiprocessing
from dask.distributed import Client

c = Client()

c
Out[7]: <Client: scheduler='tcp://127.0.0.1:46849' processes=10 cores=10>

with dask.set_options(get=c.get):
     ds=xr.open_mfdataset('/mnt/meom/MODEL_SET/NATL60/NATL60-CJM165-S/1h/*/NATL60-CJM165_y201*.1h_SSH.nc',autoclose=True, concat_dim='time_counter',chunks={'x': 1000, 'y':1000, 'time_counter': 1}, parallel=True)
        


## Test with zarr options

In [None]:
import zarr                                                                                                   
compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)                                                    
encoding = {vname: {'compressor': compressor} for vname in ds.variables}                                     

with dask.set_options(get=dask.threaded.get):
    ds.to_zarr(store='/mnt/meom/MODEL_SET/NATL60/NATL60-CJM165-S/1h/SSH/test_zarr', encoding=encoding)        

