# CNAPS RerenceFileSystem JSON 
Create ReferenceFileSystem JSON file for a collection of COAWST NetCDF files on S3 

In [None]:
import os
import fsspec
import ujson   # fast json
from kerchunk.hdf import SingleHdf5ToZarr 
from kerchunk.combine import MultiZarrToZarr, auto_dask, JustLoad
from pathlib import Path
import xarray as xr
import cf_xarray
import dask
import hvplot.xarray

#### Start a Dask Gateway cluster

In [None]:
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import nebari_utils as nbu

profile = 'osn-mghp'
region = 'us-east-1'
endpoint_url = 'https://mghp.osn.xsede.org'
nbu.set_credentials(profile=profile, region=region, endpoint_url=endpoint_url)
worker_max = 4
client,cluster = nbu.start_dask_cluster(profile=profile, worker_max=worker_max, 
                                      region=region, use_existing_cluster=False,
                                      adaptive_scaling=False, wait_for_cluster=False, 
                                      worker_profile='Small Worker', 
                                      propagate_env=True)

In [None]:
#cluster.shutdown()

In [None]:
@#cluster.adapt(minimum=4, maximum=60)

In [None]:
fs_read = fsspec.filesystem('s3', anon=True, 
                            skip_instance_cache=True, 
                            use_listings_cache=False,
                            client_kwargs={'endpoint_url': 'https://mghp.osn.xsede.org'})

In [None]:
nc_list = fs_read.glob('s3://rsignellbucket1/jzambon/*_nc4.nc')
nc_list

In [None]:
print(nc_list[0])
print(nc_list[-1])

In [None]:
json_dir = 's3://rsignellbucket1/jzambon/jsons/'

In [None]:
json_list = fs_read.glob(f'{json_dir}*.json')
print(len(json_list))
print(json_list[0])
print(json_list[-1])

In [None]:
nc_processed_list = [j.split('.json')[0].replace('/jsons','') for j in json_list]
print(nc_processed_list[0])
print(nc_processed_list[-1])

In [None]:
nc_process_list = list(set(nc_list) - set(nc_processed_list))
print(len(nc_process_list))
if len(nc_process_list)>0:
    print(nc_process_list[0])
    print(nc_process_list[-1])

In [None]:
fs_write = fsspec.filesystem('s3', anon=False, 
        skip_instance_cache=True, client_kwargs={'endpoint_url': 'https://mghp.osn.xsede.org'})

In [None]:
#for f in flist[:10]:
#    fs.rm(f)

In [None]:
flist = sorted(['s3://'+f for f in nc_process_list])

In [None]:
fsize = [fs_read.size(f) for f in flist]

In [None]:
import pandas as pd
df = pd.DataFrame({'file': flist, 'size': fsize }).sort_values('size')

In [None]:
pd.options.display.max_colwidth=100

In [None]:
df.head()

Any zero length or small files indicate files that didn't get transfered properly
Run this notebook again after transfer to make sure all the NetCDF files look okay before creating the JSON files. 

In [None]:
so = dict(mode='rb', anon=False, skip_instance_cache=True,
         client_kwargs={'endpoint_url':'https://mghp.osn.xsede.org'})

#### Create the individual JSON files directly on S3 

We passed AWS credentials to the Dask workers via environment variables above, and the dask workers don't have the AWS credentials file with profiles defined, so we don't define a profile here, we just set `anon=False` and let the workers find the credentials via the environment variables:

In [None]:
#json_dir = 's3://esip/jzambon/testing/jsons/'

In [None]:
# This removes all the JSONs.   We wouldn't only do this if we wanted to recreate everything
#try:
#    fs.rm(json_dir, recursive=True)
#except:
#    pass

In [None]:
def gen_json(u):
    with fs_read.open(u, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        fname = Path(u).stem
        outf = f'{json_dir}{fname}.nc.json'
        print(outf)
        with fs_write.open(outf, 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());

In [None]:
flist

#### Parallel creation of JSON for each file using Dask Futures

In [None]:
%%time
futures = client.map(gen_json, flist)
client.gather(futures)

Check disk space used on OSN (TB)

In [None]:
%%time
fs_write.du('s3://rsignellbucket1')/1e12

In [None]:
json_list = fs_read.ls(json_dir)
json_list = sorted(['s3://'+f for f in json_list])

In [None]:
opts = dict(anon=True, client_kwargs={'endpoint_url': 'https://mghp.osn.xsede.org'}, skip_instance_cache=True)

In [None]:
import base64

import zarr
import numpy as np

def modify_metadata(out):
    g = zarr.open(out)
    
    fill_value = 1.e+37
    for n, v in g.arrays():
        if len(v.shape) == 4:
            #fill_value = v[-1,-1,0,0]
            v.fill_value = fill_value # 1.e+37
        elif len(v.shape) == 3:
            #fill_value = v[-1,-1,0]
            v.fill_value = fill_value # 1.e+37
        elif len(v.shape) == 2:
            #fill_value = v[-1,-1]
            v.fill_value = fill_value # 1.e+37
        #g[n].fill_value = 1.e+37
    g.ocean_time.attrs['standard_name'] = 'time'
    return out
    
def postprocess(out):
    out = modify_metadata(out)
    return out

In [None]:
opts = dict(anon=True, client_kwargs={'endpoint_url': 'https://mghp.osn.xsede.org'})

Combine the jsons using Dask cluster

In [None]:
mzz_kwargs = dict(concat_dims = ['ocean_time'],
                identical_dims=['lat_psi','lat_rho','lat_u','lat_v',
                                'lon_psi','lon_rho','lon_u','lon_v'],
                preprocess=postprocess)

Update the json_list with the new files and convert to s3 urls

In [None]:
json_list = fs_read.glob(f'{json_dir}*.json')
json_list = [f's3://{j}' for j in json_list]

In [None]:
%%time
d = auto_dask(
    json_list,
    single_driver=JustLoad,
    single_kwargs={"storage_options": opts},
    mzz_kwargs=mzz_kwargs,
    n_batches=worker_max,   # give one batch to each worker
    remote_protocol="s3",
    remote_options=opts
)

Examine the resulting dataset

In [None]:
fs5 = fsspec.filesystem("reference", fo=d, target_options=opts,
                       remote_protocol='s3', remote_options=opts,
                       skip_instance_cache=True)
m = fs5.get_mapper("")

In [None]:
ds = xr.open_dataset(m, engine="zarr", chunks={'ocean_time':12}, 
                     backend_kwargs=dict(consolidated=False))

In [None]:
ds

Use CF conventions to select times

In [None]:
ds.cf.isel(time=-1)

Write combined JSON to S3

In [None]:
combined_json = 's3://rsignellbucket1/jzambon/archive.json'

In [None]:
%%time
with fs_write.open(combined_json, 'wb') as f:
    f.write(ujson.dumps(d).encode());

In [None]:
fs_write.size(combined_json)/1e6  # combined JSON size in MB

In [None]:
fs_write.info(combined_json)

#### Try opening the consolidated JSON file from S3

In [None]:
# repeating lines from above for convenience in case notebook is started here:
combined_json = 's3://rsignellbucket1/cnaps/archive.json'
opts = dict(anon=True, client_kwargs={'endpoint_url': 'https://mghp.osn.xsede.org'})

In [None]:
%%time
fs5 = fsspec.filesystem("reference", fo=combined_json, target_options=opts,
                       remote_protocol='s3', remote_options=opts,
                       skip_instance_cache=True)
m = fs5.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", chunks={}, 
                     backend_kwargs=dict(consolidated=False))

In [None]:
#fs5 = fsspec.filesystem("reference", fo=json_list[-1], target_options=opts,
#                       remote_protocol='s3', remote_options=opts)
#m = fs5.get_mapper("")
#ds = xr.open_dataset(m, engine="zarr", chunks={'ocean_time':12}, 
#                     backend_kwargs=dict(consolidated=False))

In [None]:
ds.salt

In [None]:
%%time
da = ds['temp'][-10:,-1,:,:].load()

In [None]:
lon_name = da.cf['longitude'].name
lat_name = da.cf['latitude'].name

da.hvplot.quadmesh(x=lon_name, y=lat_name, geo=True, cmap='turbo', tiles='OSM', rasterize=True)

In [None]:
da[:,150,150].hvplot(x='ocean_time', grid=True)

In [None]:
client.close(); cluster.shutdown()