In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import climtas
import xarray
import glob
import dask

In [3]:
import os
import dask.distributed

# Edit as desired
threads_per_worker = 1

try:
    c # Already running
except NameError:
    c = dask.distributed.Client(
        n_workers=int(os.environ['PBS_NCPUS'])//threads_per_worker,
        threads_per_worker=threads_per_worker,
        memory_limit=f'{4*threads_per_worker}gb',
        local_directory=os.path.join(os.environ['PBS_JOBFS'],
                                     'dask-worker-space')
    )
c

0,1
Client  Scheduler: tcp://127.0.0.1:33965  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 16.00 GB


In [4]:
ds = xarray.open_mfdataset(sorted(glob.glob('/g/data/w35/dh4185/data/AWAP/awra_sm_pct/sm_pct_*.nc')), 
                           combine='nested', 
                           concat_dim='time',
                           chunks={'latitude':20, 'longitude': 100})
ds

In [5]:
climtas.helpers.dask_report(ds.sm_pct)

Chunk Count: 34335
Chunk Size: 2.93 MB
Graph Size: 68779


In [6]:
r = climtas.rank.rank_by_dayofyear(ds.sm_pct)
r

In [7]:
climtas.helpers.dask_report(r)

Chunk Count: 315
Chunk Size: 317.10 MB
Graph Size: 70984


In [8]:
r[0,0,0].load()

In [9]:
%%time
#climtas.io.to_netcdf_chunkwise(r, '~/scratch/test.nc')


CPU times: user 1e+03 ns, sys: 1 µs, total: 2 µs
Wall time: 5.48 µs


In [10]:
c.cluster.workers

{0: <Nanny: tcp://127.0.0.1:33615, threads: 1>,
 1: <Nanny: tcp://127.0.0.1:34435, threads: 1>,
 2: <Nanny: tcp://127.0.0.1:45825, threads: 1>,
 3: <Nanny: tcp://127.0.0.1:37285, threads: 1>}

In [11]:
def to_netcdf_chunkwise(da, path, complevel=4):
    """
    Save a DataArray to file by calculating each chunk separately (rather than
    submitting the whole Dask graph at once). This may be helpful when chunks
    are large, e.g. doing an operation on dayofyear grouping for a long timeseries.
    """
    ds = xarray.Dataset({da.name: da})

    encoding = {da.name: {
        'zlib': True,
        'shuffle': True,
        'complevel': complevel,
        'chunksizes': da.data.chunksize,
        }}

    f = ds.to_netcdf(path, encoding=encoding, compute=False)

    client = c
    sync = True

    # Run each of the save operations one at a time, then finalize
    old_graph = f.__dask_graph__()
    new_graph = {}
    futures = []
    for k, v in old_graph.items():
        if v[0] == dask.array.core.store_chunk:
            opt, _ = dask.optimization.cull(old_graph, k)
            opt, _ = dask.optimization.fuse(opt, k)
            new_graph[k] = client.get(opt, k, sync=sync)
            futures.append(new_graph[k])
            continue
            
        new_graph[k] = v
    
    # Finalise
    k = list(f.__dask_layers__())
    opt, _ = dask.optimization.cull(new_graph, k)
    opt, _ = dask.optimization.fuse(opt, k)
    ff = client.get(opt, k, sync=sync)
    
    if not sync:
        dask.distributed.wait(ff)


In [12]:
%%time

to_netcdf_chunkwise(r, '~/scratch/test.nc')




KeyboardInterrupt: 

