# Using Kerchunk to analyze MUR SST Data


#  Motivation:
* Keeping a zarr store up to date with new data may be more challenging then just adding an additional metadata file

### Import `kerchunk` and make sure it's at the latest version (`0.0.6` at the time of writing)

In [1]:
import kerchunk
kerchunk.__version__

'0.0.6'

_If Kerchunk is not at the latest version, update with pip/conda: and **restart the kernel**_

In [None]:
# !pip install --upgrade kerchunk

In [4]:
import dask.bag as db
import xarray as xr
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import fsspec
from glob import glob

### Open a new filesystem, of type `s3` (Amazon Web Services storage)
This tells `fsspec` what type of storage system to use (AWS S3) and any authentication options (this is a public dataset, so use anonymous mode `anon=True`)

In [5]:
# get credentials from
# https://archive.podaac.earthdata.nasa.gov/s3credentials
# add those credentials to ~/.aws/credentials
fs = fsspec.filesystem('s3', profile='aimee')

In [6]:
len(fs.ls('s3://podaac-ops-cumulus-protected/MUR-JPL-L4-GLOB-v4.1/'))

14763

Use `fs.glob()` to generate a list of files in a certain directory

In [7]:
flist = fs.glob("s3://podaac-ops-cumulus-protected/MUR-JPL-L4-GLOB-v4.1/*.nc")

### Prepend `s3://` to the URLS

In [8]:
flist = ['s3://' + f for f in flist]
len(flist)

7381

## Example of creating a single reference

In [7]:
u = flist[0]
u

's3://podaac-ops-cumulus-protected/MUR-JPL-L4-GLOB-v4.1/20020601090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc'

In [12]:
with fsspec.open(u, mode="rb", profile='aimee') as infile:
    reference = SingleHdf5ToZarr(infile, u, inline_threshold=100).translate()

### Use ESIP Jupyterhub Dask cluster to generate all the references

This should be used instead of the above local Dask cluster if running on ESIP's JupyterHub at https://jupyter.qhub.esipfed.org

In [14]:
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'], 'shared', 'users', 'lib'))
import imp
import ebdpy as ebd
imp.reload(ebd)
import ebdpy as ebd

ebd.set_credentials(profile='aimee')

profile = 'aimee'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 20


client,cluster = ebd.start_dask_cluster(profile=profile,
                                        worker_max=worker_max, 
                                        region=region,
                                        use_existing_cluster=False,
                                        adaptive_scaling=False,
                                        wait_for_cluster=True, 
                                        worker_profile='Pangeo Worker', 
                                        propagate_env=True)

  import imp


Region: us-west-2
Existing Dask clusters:
Cluster Index c_idx: 0 / Name: dev.26e9bd587eb944c89db9dcefd3e65b01 ClusterStatus.RUNNING
Cluster Index c_idx: 1 / Name: dev.c3682651305b4df1a5567b3559922d36 ClusterStatus.RUNNING
Cluster Index c_idx: 2 / Name: dev.bf18dd629f3a4ebea6715843a7e96acf ClusterStatus.RUNNING
Cluster Index c_idx: 3 / Name: dev.61cd030b190543eb9c8e2666e55e8d64 ClusterStatus.RUNNING
Starting new cluster.
{}
Setting Cluster Environment Variable AWS_DEFAULT_REGION us-west-2
Setting Fixed Scaling workers=20
Reconnect client to clear cache
client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub):
https://jupyter.qhub.esipfed.org/gateway/clusters/dev.43e0aecc2bb64d21b0d15e5c10c72ac3/status
Elapsed time to wait for 20 live workers:
20/20 workers - 112 seconds
Propagating environment variables to workers
Using environment: users/pangeo


In [15]:
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: https://jupyter.qhub.esipfed.org/gateway/clusters/dev.43e0aecc2bb64d21b0d15e5c10c72ac3/status,


## Define `gen_ref` function to return a reference dictionary for a given S3 file URL

This function does the following:
1. `so` is a dictionary of options for `fsspec.open()`
2. Use `fsspec.open()` to open the file given by URL `f`
3. Using `kerchunk.SingleHdf5ToZarr()` and supplying the file object `infile` and URL `f`, generate reference with `.translate()`

In [9]:
def gen_ref(f):
    with fsspec.open(f, mode="rb", anon=False) as infile:
        return SingleHdf5ToZarr(infile, f, inline_threshold=300).translate()

### Map `gen_ref` to each member of `flist_bag` and compute
Dask bag is a way to map a function to a set of inputs. This next couple blocks of code tell Dask to take all the files in `flist`, break them up into the same amount of partitions and map each partition to the `gen_ref()` function -- essentially mapping each file path to `gen_ref()`. Calling `bag.compute()` on this runs `gen_ref()` in parallel with as many workers as are available in Dask client.

_Note: if running interactively on Binder, this will take a while since only one worker is available and the references will have to be generated in serial. See option for loading from jsons below_

In [10]:
bag = db.from_sequence(flist, npartitions=len(flist)).map(gen_ref)

In [12]:
# probably don't want to do this if you have a long flist
# bag.visualize()

In [20]:
%time dicts = bag.compute()

CPU times: user 48.6 s, sys: 7.19 s, total: 55.8 s
Wall time: 19min 26s


Now, each url in `flist` has been used to generate a dictionary of reference data in `dicts`

In [21]:
len(dicts)

7355

### _Save/load references to/from JSON files (optional)_
The individual dictionaries can be saved as JSON files if desired

In [25]:
import ujson
directory = './mursst_jsons/individual/'
# This could be done in dask but is actually pretty fast
for d in dicts:
    # Generate name from corresponding URL:
    # Grab URL, strip everything but the filename, 
    # and replace .nc with .json
    name = directory + d['templates']['u'].split('/')[-1].replace('.nc', '.json')

    with open(name, 'w') as outf:
        outf.write(ujson.dumps(d))

***
### Use `MultiZarrToZarr` to combine the individual references into a single reference
In this example we passed a list of reference dictionaries, but you can also give it a list of `.json` filepaths (commented out)

_Note: the Kerchunk `MultiZarrToZarr` API changed between versions 0.0.5 and 0.0.6. This part assumes the latest version at this time (0.0.6). Please see https://fsspec.github.io/kerchunk/reference.html#kerchunk.combine.MultiZarrToZarr for more details_

In [26]:
mzz = MultiZarrToZarr(
    dicts, # or sorted((glob(directory + '*.json'))),
    remote_protocol='s3',
    remote_options={'anon':False},
    concat_dims=['time'],
    inline_threshold=0
)

References can be saved to a file (`combined.json`) or passed back as a dictionary (`mzz_dict`)

In [27]:
%time mzz.translate('./combined.json')

CPU times: user 19min 53s, sys: 15.5 s, total: 20min 9s
Wall time: 20min 4s
