# Create reference files for the COAWST forecast archive on AWS Open Data
We use [kerchunk](https://fsspec.github.io/kerchunk/) to create individual reference files for each weekly NetCDF file, 
then create the combined JSON that allows access to the entire collection as a single dataset in Xarray

In [None]:
import fsspec
import xarray as xr

from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
from fsspec.implementations.reference import LazyReferenceMapper

from dask.distributed import Client
import dask.bag as db
from pathlib import Path
import numpy as np
import ujson

We can read from AWS Open Data using `anon=True`:

In [None]:
fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, use_listings_cache=False )

We can't *write* to AWS Open Data without credentials, which we will specify through environment variables.  Because we are going to use environment variables instead of referencing an AWS profile, we don't specify `profile=` here in fs_write, but use `anon=False`:

In [None]:
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, use_listings_cache=False)

In [None]:
flist = fs_read.glob('s3://usgs-coawst/useast-archive/*.nc')
json_dir = 's3://usgs-coawst/useast-archive/individual_jsons'

In [None]:
flist = [f's3://{f}' for f in flist]

In [None]:
print(len(flist))
print(flist[0])
print(flist[-1])

#### Create references for each NetCDF file in parallel 

In [None]:
so = dict(mode='rb', anon=True, skip_instance_cache=True)

In [None]:
def gen_json(u):
    with fs_read.open(u, **so) as infile:
        fname = Path(u).stem
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        outf = f'{json_dir}/{fname}.json'
        with fs_write.open(outf, 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());
    return outf

In [None]:
import sys, os
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import nebari_tools as nbt

aws_profile = 'coawst_open_data'
aws_region = 'us-west-2'
endpoint_url = f's3.{aws_region}.amazonaws.com'
nbt.set_credentials(profile=aws_profile, region=aws_region, endpoint_url=endpoint_url)

In [None]:
worker_max = 30
client,cluster = nbt.start_dask_cluster(profile=aws_profile, worker_max=worker_max, 
                                      region=aws_region, use_existing_cluster=False,
                                      adaptive_scaling=False, wait_for_cluster=False, 
                                      worker_profile='Small Worker', 
                                      propagate_env=True) 

In [None]:
%%time
bag = db.from_sequence(flist, npartitions=worker_max*4).map(gen_json)
bag.compute(retries=20)

#### Create combined references and store in Parquet files

In [None]:
json_list = fs_read.glob(f'{json_dir}/*.json')
json_list = [f's3://{j}' for j in json_list]
print(len(json_list))
print(json_list[0])
print(json_list[-1])

In [None]:
# fs_write.rm(json_list)  # use this if you need to start over

In [None]:
import zarr

def modify_attrs(refs):
    tmp= zarr.open(refs)
    tmp.ocean_time.attrs['standard_name'] = 'time'
    return refs

def postprocess(refs):
    refs = modify_attrs(refs)
    return refs

def preprocess(refs):
    for k in list(refs):
        if k=='dstart':  # drop the "dstart" variable
            refs.pop(k)
    return refs

In [None]:
fs_local = fsspec.filesystem("file")

combined_parquet = 'combined.parq'

fs_local.makedirs(combined_parquet, exist_ok=True)

In [None]:
out = LazyReferenceMapper.create(100000, combined_parquet, fs_local)

In [None]:
%%time
_ = MultiZarrToZarr(
        json_list,
        remote_protocol="s3",
        concat_dims=["ocean_time"],
        coo_map={"ocean_time": "cf:ocean_time"},
        identical_dims=['lat_psi','lat_rho','lat_u','lat_v',
                        'lon_psi','lon_rho','lon_u','lon_v'],
        preprocess=preprocess,
        postprocess=postprocess,
        out=out).translate()
out.flush()

In [None]:
combined_parquet_aws = 's3://usgs-coawst/useast-archive/combined.parq'
_ = fs_write.upload(combined_parquet, combined_parquet_aws, recursive=True)

#### Test opening combined dataset:

In [None]:
%%time
fs_ref = fsspec.implementations.reference.ReferenceFileSystem(
            combined_parquet_aws, remote_protocol="s3", target_protocol="s3", lazy=True)

ds = xr.open_dataset(fs_ref.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}, chunks={})

ds

#### Write Intake Catalog

In [None]:
cat_file = "coawst_intake.yml"

dataset = 'COAWST-USEAST' 
    
ymlentry = f"""# THIS FILE AUTO-GENERATED
# This 'header' information needed to make this yml valid.  
description: 'intake catalog example'
metadata:
  version: 1
  description: "Intake Catalog for data from USGS-COAWST model"

sources:
# If you are copy/paste-ing this entry to another catalog, you only need the 
# lines from here down in your 'sources' section (be sure to indent as appropriate).
  {dataset}:
    driver: intake_xarray.xzarr.ZarrSource
    description: 'USGS COAWST US East Coast and Gulf Coast Forecast Archive'
    args:
      urlpath: 'reference://'
      consolidated: false
      storage_options:
        target_options:
          anon: true
          skip_instance_cache: true
        fo: '{combined_parquet_aws}'
        lazy: 'true'
        remote_options:
          anon: true
          skip_instance_cache: true
        remote_protocol: 's3'
"""

with open(cat_file, 'w') as ymlfile:
   ymlfile.write(ymlentry)

In [None]:
cat_file_s3 = f's3://usgs-coawst/useast-archive/{cat_file}'

In [None]:
_ = fs_write.upload(cat_file, cat_file_s3)

In [None]:
cat_file_s3

#### Shut down cluster

In [None]:
client.close();   cluster.shutdown()