In [1]:
import boto3
import dask.bag as db
from dask.distributed import Client
from dask_gateway import Gateway, GatewayCluster
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
from kerchunk.netCDF3 import NetCDF3ToZarr
import s3fs
import xarray as xr

In [2]:
s3fsfs = s3fs.S3FileSystem(anon=False)

In [3]:
s3_file_directory = 's3://cmip6-staging/Sample/FWI/Monthly/MME/'
flist = s3fsfs.ls(s3_file_directory)
flist = [f"s3://{f}" for f in flist]

In [5]:
#flist

In [24]:
# Open one file
ds = xr.open_dataset(s3fsfs.open(flist[-1]))
ds

In [27]:
ds['FWI'].encoding # has no chunks in its encoding

{'chunksizes': None,
 'fletcher32': False,
 'shuffle': False,
 'source': '<File-like object S3FileSystem, cmip6-staging/Sample/FWI/Monthly/MME/MME50_ssp245_fwi_metrics_monthly_2100.nc>',
 'original_shape': (12, 600, 1440),
 'dtype': dtype('<f4'),
 '_FillValue': nan}

In [15]:
# Create a kerchunk reference    
def gen_ref(f):
    with fsspec.open(f, mode="rb", anon=False) as infile:
        return SingleHdf5ToZarr(infile, f, inline_threshold=0).translate()
    
reference = gen_ref(flist[0])

In [16]:
ds = xr.open_dataset(
    "reference://",
    engine='zarr',
    decode_coords="all",
    backend_kwargs={
        "storage_options": {
            "fo": reference,
        },
        "consolidated": False
    }
)

In [17]:
ds

# Test access from a dask worker

In [7]:
gateway = Gateway()
cluster = GatewayCluster(shutdown_on_close=True)

In [8]:
client = cluster.get_client()
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /services/dask-gateway/clusters/prod.d4949d7b3ea24f209a335a0a1a52f7a7/status,


In [9]:
cluster.scale(4)

In [10]:
def test_access(f):
    return xr.open_dataset(s3fsfs.open(f), decode_cf=True, engine="h5netcdf")

In [11]:
result = client.run(test_access, flist[0])

In [13]:
#print(result)

# Run reference file generation on dask

In [18]:
# Estimate time for 10 files
bag = db.from_sequence(flist[0:10], npartitions=10).map(gen_ref)
%time dicts = bag.compute()

CPU times: user 140 ms, sys: 79 ms, total: 219 ms
Wall time: 11.9 s


In [19]:
bag = db.from_sequence(flist).map(gen_ref)
%time dicts = bag.compute()

CPU times: user 249 ms, sys: 7.79 ms, total: 256 ms
Wall time: 2min 15s


In [20]:
len(dicts)

151

# Generate Combined Kerchunk Reference

In [21]:
mzz = MultiZarrToZarr(
    dicts,
    remote_protocol='s3',
    remote_options={'anon': False},
    concat_dims=['time'],
    coo_map={"time": "cf:time"},
    inline_threshold=0
)

In [32]:
combined_kerchunk = 'fwi-kerchunk-reference.json'
%time mzz.translate(combined_kerchunk)

CPU times: user 3.17 ms, sys: 0 ns, total: 3.17 ms
Wall time: 30.9 ms


In [33]:
fs = fsspec.filesystem(
    'reference', 
    fo=combined_kerchunk, 
    remote_protocol='s3', 
    skip_instance_cache=True
)
ds = xr.open_dataset(fs.get_mapper(""), engine='zarr', chunks='auto', consolidated=False)
ds

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.83 GiB 118.65 MiB Shape (1812, 600, 1440) (36, 600, 1440) Dask graph 51 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  1812,

Unnamed: 0,Array,Chunk
Bytes,5.83 GiB,118.65 MiB
Shape,"(1812, 600, 1440)","(36, 600, 1440)"
Dask graph,51 chunks in 2 graph layers,51 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [30]:
# Upload to s3
bucket = 'nasa-eodc-zarrs'
directory = 'ames_research_center_fwi_monthly'
s3 = boto3.client('s3')

In [34]:
response = s3.upload_file(combined_kerchunk, bucket, f"{directory}/{combined_kerchunk}")
response

# WIP: Kerchunk CMIP6

In [None]:
cmip6_file = "s3://cmip6-staging/Sample/ens-avg/tasmax_decadal_monthly_ens_avg_ssp245.nc"
file = cmip6_file.split("/")[-1]

In [None]:
!aws s3 cp s3://cmip6-staging/Sample/ens-avg/tasmax_decadal_monthly_ens_avg_ssp245.nc .

In [None]:
reference = NetCDF3ToZarr(file, inline_threshold=0).translate()    

In [None]:
fs = fsspec.filesystem(
    'reference', 
    fo=reference, 
    remote_protocol='s3', 
    skip_instance_cache=True
)
ds = xr.open_dataset(fs.get_mapper(""), engine='zarr', decode_times=False, consolidated=False)
ds