In [None]:
import xarray as xr
import dask
import ujson
import s3fs
import fsspec
from glob import glob
import pathlib
import hvplot.xarray

from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr

# The xarray produced from the reference file throws a SerializationWarning for each variable. Will need to explore why
import warnings
warnings.simplefilter("ignore")

## Start up a Dask Client to monitor Dask processing

In [None]:
from dask.distributed import Client
client = Client(n_workers=4)
client

## Identify the S3 bucket

In [None]:
bucket = 's3://npwbanalres'

## Connect to the S3 bucket and list files it contains

In [None]:
s3 = s3fs.S3FileSystem(anon=False)

In [None]:
#s3.ls(bucket)
reference_list = s3.glob(f'{bucket}/*.nc4.json')
reference_list

**Use Dask to process multiple files in parallel**

In [None]:
reference_files = []
for url in rain_urls:
    ref = dask.delayed(gen_json)(url)
    reference_files.append(ref)

reference_files_compute = dask.compute(*reference_files)

---

## Working with Kurchunck files

### Read a single Kurchunk file

In [None]:
reference_file = reference_list[0]
reference_file

In [None]:
s_opts = {'skip_instance_cache':True}
r_opts = {'anon':False}

In [None]:
fs_single = fsspec.filesystem("reference",
                              fo=f's3://{reference_file}',
                              ref_storage_args=s_opts,
                              remote_protocol='s3', 
                              remote_options=r_opts)

In [None]:
%%time
m = fs_single.get_mapper("")
ds_single = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={})
ds_single

### Read multiple Kurchunck files

In [None]:
rf_rain = [x for x in reference_list if 'rain.nc4.json' in x]
rf_rain[:10]

In [None]:
# def mapper(reference_file, s_opts, r_opts):
#     fs = fsspec.filesystem("reference",
#                            fo=f's3://{reference_file}',
#                            ref_storage_args=s_opts,
#                            remote_protocol='s3',
#                            remote_options=r_opts)
#     m = fs.get_mapper("")
#     return m

In [None]:
# %%time
# ds_k =[]
# for ref in rf_rain:
#     s_opts = s_opts
#     r_opts = r_opts
#     m = dask.delayed(mapper)(ref, s_opts, r_opts)
#     ds_k.append(xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={}))

# ds_k_compute = dask.compute(*ds_k)

In [None]:
%%time
ds_k =[]
for ref in rf_rain:
    s_opts = s_opts
    r_opts = r_opts
    fs = fsspec.filesystem("reference",
                           fo=f's3://{ref}',
                           ref_storage_args=s_opts,
                           remote_protocol='s3',
                           remote_options=r_opts)
    m = fs.get_mapper("")
    ds_k.append(xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={}))

In [None]:
%%time
ds_multi = xr.concat(ds_k, dim='time')
ds_multi

In [None]:
t_slice = ds_multi.sel(time=slice('2001-12-01', '2002-03-01'))
t_slice