In [1]:
import os
import argparse
import logging

import xarray as xr
from rechunker import rechunk
import zarr
#import cmdline_provenance as cmdprov

In [2]:
def define_target_chunks(ds, var):
    """Create a target chunks dictionary."""

    chunks = {'time': len(ds['time']), 'lat': 1, 'lon': 1}
    target_chunks_dict = {var: chunks}
    variables = list(ds.keys())
    variables.remove(var)
    coords = list(ds.coords.keys())
    for name in coords + variables:
        target_chunks_dict[name] = None

    return target_chunks_dict


def drop_vars(ds):
    """Drop unwanted variables"""

    for var in ['height', 'lat_bnds', 'lon_bnds', 'time_bnds']:
        try:
            ds = ds.drop_vars(var)
        except ValueError:
            pass

    return ds

In [54]:
var = 'kbdi'
if var == 'kbdi':
    path = '/g/data/xv83/dbi599/treasury'
else:
    path = f'/g/data/fs38/publications/CMIP6/CMIP/CSIRO/ACCESS-ESM1-5/historical/r1i1p1f1/day/{var}/gn/latest'

infiles = [
    f'{path}/{var}_day_ACCESS-ESM1-5_historical_r1i1p1f1_gn_19500101-19991231.nc',
    f'{path}/{var}_day_ACCESS-ESM1-5_historical_r1i1p1f1_gn_20000101-20141231.nc',
    f'{path}/{var}_day_ACCESS-ESM1-5_ssp370_r1i1p1f1_gn_20150101-20641231.nc',
    f'{path}/{var}_day_ACCESS-ESM1-5_ssp370_r1i1p1f1_gn_20650101-21001231.nc',
]      

temporal_zarr = f'/g/data/xv83/dbi599/treasury/{var}_day_ACCESS-ESM1-5_ssp370_r1i1p1f1_gn_19500101-21001231_temporal.zarr'
spatial_zarr = f'/g/data/xv83/dbi599/treasury/{var}_day_ACCESS-ESM1-5_ssp370_r1i1p1f1_gn_19500101-21001231_spatial.zarr'
temp_zarr = '/g/data/xv83/dbi599/treasury/temp.zarr'

In [58]:
infiles

['/g/data/xv83/dbi599/treasury/kbdi_day_ACCESS-ESM1-5_historical_r1i1p1f1_gn_19500101-19991231.nc',
 '/g/data/xv83/dbi599/treasury/kbdi_day_ACCESS-ESM1-5_historical_r1i1p1f1_gn_20000101-20141231.nc',
 '/g/data/xv83/dbi599/treasury/kbdi_day_ACCESS-ESM1-5_ssp370_r1i1p1f1_gn_20150101-20641231.nc',
 '/g/data/xv83/dbi599/treasury/kbdi_day_ACCESS-ESM1-5_ssp370_r1i1p1f1_gn_20650101-21001231.nc']

In [56]:
filevar = 'KBDI' if var == 'kbdi' else var

In [59]:
filevar

'KBDI'

In [37]:
if os.path.isdir(temporal_zarr):
    clean_up_command = f'rm -r {temporal_zarr}'
    print(clean_up_command)
    os.system(clean_up_command)

In [38]:
if os.path.isdir(spatial_zarr):
    clean_up_command = f'rm -r {spatial_zarr}'
    print(clean_up_command)
    os.system(clean_up_command)

In [6]:
from dask.distributed import Client, LocalCluster
#cluster = LocalCluster()
#client = Client(cluster)

client = Client()
client

#dask.diagnostics.ProgressBar().register()

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 7
Total threads: 28,Total memory: 125.19 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34249,Workers: 7
Dashboard: http://127.0.0.1:8787/status,Total threads: 28
Started: Just now,Total memory: 125.19 GiB

0,1
Comm: tcp://127.0.0.1:35869,Total threads: 4
Dashboard: http://127.0.0.1:35229/status,Memory: 17.88 GiB
Nanny: tcp://127.0.0.1:44495,
Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-vu7o5zgr,Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-vu7o5zgr

0,1
Comm: tcp://127.0.0.1:42565,Total threads: 4
Dashboard: http://127.0.0.1:36733/status,Memory: 17.88 GiB
Nanny: tcp://127.0.0.1:44787,
Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-h_qq4eq3,Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-h_qq4eq3

0,1
Comm: tcp://127.0.0.1:37219,Total threads: 4
Dashboard: http://127.0.0.1:33571/status,Memory: 17.88 GiB
Nanny: tcp://127.0.0.1:40933,
Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-iexwfrh9,Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-iexwfrh9

0,1
Comm: tcp://127.0.0.1:34825,Total threads: 4
Dashboard: http://127.0.0.1:35103/status,Memory: 17.88 GiB
Nanny: tcp://127.0.0.1:33115,
Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-ha6xeafd,Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-ha6xeafd

0,1
Comm: tcp://127.0.0.1:35321,Total threads: 4
Dashboard: http://127.0.0.1:41685/status,Memory: 17.88 GiB
Nanny: tcp://127.0.0.1:36559,
Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-mzmwcpml,Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-mzmwcpml

0,1
Comm: tcp://127.0.0.1:33493,Total threads: 4
Dashboard: http://127.0.0.1:36867/status,Memory: 17.88 GiB
Nanny: tcp://127.0.0.1:40557,
Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-tp_xsj6o,Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-tp_xsj6o

0,1
Comm: tcp://127.0.0.1:41285,Total threads: 4
Dashboard: http://127.0.0.1:46513/status,Memory: 17.88 GiB
Nanny: tcp://127.0.0.1:34517,
Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-gi07j3ja,Local directory: /jobfs/152018362.gadi-pbs/dask-scratch-space/worker-gi07j3ja


In [57]:
ds = xr.open_mfdataset(infiles, preprocess=drop_vars)
coords = list(ds.coords)
chunks = ds[filevar].encoding['chunksizes']
input_chunks = {}
for coord, chunk in zip(coords, chunks):
    input_chunks[coord] = chunk
ds = ds.chunk(input_chunks)
for dvar in ds.variables:
    ds[dvar].encoding = {}

TypeError: 'NoneType' object is not iterable

In [40]:
ds

Unnamed: 0,Array,Chunk
Bytes,5.72 GiB,108.75 kiB
Shape,"(55152, 145, 192)","(1, 145, 192)"
Dask graph,55152 chunks in 10 graph layers,55152 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.72 GiB 108.75 kiB Shape (55152, 145, 192) (1, 145, 192) Dask graph 55152 chunks in 10 graph layers Data type float32 numpy.ndarray",192  145  55152,

Unnamed: 0,Array,Chunk
Bytes,5.72 GiB,108.75 kiB
Shape,"(55152, 145, 192)","(1, 145, 192)"
Dask graph,55152 chunks in 10 graph layers,55152 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [41]:
#ds.to_zarr(temporal_zarr)

In [42]:
#zarr.consolidate_metadata(temporal_zarr)

In [43]:
#source_group = zarr.open(temporal_zarr)

In [44]:
#print(source_group.tree())

In [45]:
#source_array = source_group[var]

In [46]:
#source_array.info

In [47]:
target_chunks_dict = define_target_chunks(ds, filevar)

In [48]:
target_chunks_dict

{'pr': {'time': 55152, 'lat': 1, 'lon': 1},
 'time': None,
 'lat': None,
 'lon': None}

In [49]:
group_plan = rechunk(
    ds,
#    source_group,
    target_chunks_dict,
    '15GB',
    spatial_zarr,
    temp_store=temp_zarr
)

In [50]:
group_plan.execute()

<zarr.hierarchy.Group '/'>

In [51]:
zarr.consolidate_metadata(spatial_zarr)

<zarr.hierarchy.Group '/'>

In [52]:
clean_up_command = f'rm -r {temp_zarr}'
os.system(clean_up_command)

0