In [1]:
import xarray as xr
import numpy as np
from dask.distributed import Client, LocalCluster

### Setup single machine client

In [2]:
cluster = LocalCluster()  # Create a local cluster  
cluster.scale(3)
cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34693 instead
  http_address["port"], self.http_server.port


### 9km2/rm

In [11]:
da = xr.open_mfdataset('/media/silsbelab/LaCie/occci/v5/daily/9km2/log2/chl*.nc', parallel=True,
                      concat_dim='time', engine='netcdf4', combine='nested')

da = da.sel(lat=slice(60,-60)).chunk({'time': -1, 'lat':120, 'lon':120})
da

Unnamed: 0,Array,Chunk
Bytes,208.77 GB,483.26 MB
Shape,"(8390, 1440, 4320)","(8390, 120, 120)"
Count,64964 Tasks,432 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 208.77 GB 483.26 MB Shape (8390, 1440, 4320) (8390, 120, 120) Count 64964 Tasks 432 Chunks Type float32 numpy.ndarray",4320  1440  8390,

Unnamed: 0,Array,Chunk
Bytes,208.77 GB,483.26 MB
Shape,"(8390, 1440, 4320)","(8390, 120, 120)"
Count,64964 Tasks,432 Chunks
Type,float32,numpy.ndarray


#### https://stackoverflow.com/questions/63906769/writing-xarray-multiindex-data-in-chunks
### https://ncar.github.io/esds/posts/2020/writing-multiple-netcdf-files-in-parallel-with-xarray-and-dask/index.html

In [12]:
import itertools

def split_by_chunks(dataset):
    chunk_slices = {}
    for dim, chunks in dataset.chunks.items():
        slices = []
        start = 0
        for chunk in chunks:
            if start >= dataset.sizes[dim]:
                break
            stop = start + chunk
            slices.append(slice(start, stop))
            start = stop
        chunk_slices[dim] = slices
    for slices in itertools.product(*chunk_slices.values()):
        selection = dict(zip(chunk_slices.keys(), slices))
        yield dataset[selection]

def create_filepath(ds, prefix='g', root_path="."):
    lat = int(ds.lat.min().values)
    lon = int(ds.lon.min().values)
    if lat < 0:
        lat = str(abs(lat)) + 'S'
    else:
        lat = str(lat) + 'N'
    if lon < 0:
        lon = str(abs(lon)) + 'W'
    else:
        lon = str(lon) + 'E'
    filepath = f'{root_path}/{prefix}_{lat}_{lon}.nc'
    return filepath

In [13]:
datasets = list(split_by_chunks(da))


In [14]:
paths = [create_filepath(ds, root_path='/media/silsbelab/LaCie/occci/v5/regrid/chl_log2') for ds in datasets]


In [15]:
xr.save_mfdataset(datasets=datasets, paths=paths)
