# Virtual Data Set Reference File Creation for Pilot Run on PO.DAAC

#### *Author: Dean Henze, PO.DAAC*

## Summary

This notebook will be used to create virtual data set reference files for a few collections on PO.DAAC. This is part of a pilot project to put these reference files on PO.DAAC, link to their location in CMR, and access them with a wrapper function around earthaccess. 

Uses kerchunk to create the reference files. A future version may use the virtualizarr package.

Currently produces a reference file for the MUR 0.01 degree collection.

**This notebook is meant to be run in the cloud**

## Import Packages
Ran in a Python 3.12 environment. The minimal working install was
```
pip install kerchunk==0.2.6 fastparquet==2024.5.0 xarray==2024.1.0 earthaccess==0.11.0 fsspec==2024.10.0 "dask[complete]"==2024.5.2 h5netcdf==1.3.0 ujson==5.10.0 matplotlib==3.9.2 jupyterlab jupyter-server-proxy coiled==1.58.0
```

In [1]:
# Built-in packages
import os
import json

# Filesystem management 
import fsspec
import earthaccess

# Data analysis
import xarray as xr
import pandas as pd
from kerchunk.df import refs_to_dataframe
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr

# Parallel computing 
import multiprocessing
from dask import delayed
import dask.array as da
from dask.distributed import Client
import coiled

# Other
import ujson

## 1. Reference file for first decade of MUR 0.01 collection

This is done in three steps:
1. Create reference files for each day,
2. Combine the daily reference files into yearly reference files,
3. Combined the yearly files into a single file.

Additionally, the final combined reference file is generated in both JSON and PARQUET formats.

### 1.1 Daily reference files

**Locate MUR S3 endpoints**: Handling access credentials to Earthdata and then finding the endpoints can be done a number of ways (e.g. using the `requests`, `s3fs` packages) but the `earthaccess` package is chosen here.

In [2]:
# Get Earthdata creds
earthaccess.login()

Enter your Earthdata Login username:  deanh808
Enter your Earthdata password:  ········


<earthaccess.auth.Auth at 0x7fbe52f6ade0>

In [3]:
# Get AWS creds
fs = earthaccess.get_s3_filesystem(daac="PODAAC")

In [18]:
# Locate MUR file information and generate file-like objects:
granule_info = earthaccess.search_data(
    short_name="MUR-JPL-L4-GLOB-v4.1",
    temporal=("2002-01-01", "2012-12-31"),
    )
fobjs = earthaccess.open(granule_info)

QUEUEING TASKS | :   0%|          | 0/3868 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/3868 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/3868 [00:00<?, ?it/s]

In [9]:
len(fobjs)

3868

**Small wrapper function around kerchunk and earthaccess packages for reference generation of a single data file.**

In [10]:
def single_ref_earthaccess(fobj, dir_save=None):
    """
    Create a reference for a single data file. "fobj" (earthaccess.store.EarthAccessFile 
    object) is the output from earthaccess.open(), which also has the file endpoint. 
    Option to save as a JSON to direcotry "dir_save", with file name of the corresponding 
    data file with ".json" appended. Otherwise reference info is returned.
    """
    endpoint = fobj.full_name
    reference = SingleHdf5ToZarr(fobj, endpoint, inline_threshold=0).translate()
    
    if dir_save is not None:
        with open(dir_save + endpoint.split('/')[-1]+'.json', 'w') as outf:
            outf.write(ujson.dumps(reference))
    else:
        return reference, endpoint # returns both the kerchunk reference and the path the file on podaac-ops-cumulus-protected

**Parallelize above function and run for all MUR files**. Using distributed cluster with Coiled.

In [13]:
## Save reference JSONs in this directory:
dir_refs_indv = './reference_jsons_individual/'
!mkdir $dir_refs_indv

In [14]:
%%time

## --------------------------------------------
## Create single reference files with parallel computing using Coiled
## --------------------------------------------

# Wrap `create_single_ref` into coiled function:
single_ref_earthaccess_par = coiled.function(
    region="us-west-2", spot_policy="on-demand", 
    vm_type="m6g.medium", n_workers=100
    )(single_ref_earthaccess)

# Begin computations:
results = single_ref_earthaccess_par.map(fobjs)

# Save results to JSONs as they become available:
for reference, endpoint in results:
    name_ref = dir_refs_indv + endpoint.split('/')[-1].replace('.nc', '.json')
    with open(name_ref, 'w') as outf:
        outf.write(ujson.dumps(reference))

Output()

Output()

CPU times: user 16.1 s, sys: 1.06 s, total: 17.2 s
Wall time: 11min 42s


In [15]:
single_ref_earthaccess_par.cluster.shutdown()

### 1.2 Create yearly combined reference files

This is a somewhat overly complicated way of breaking the paths to the reference files into a list of lists, where each list is the paths for a single year.

In [18]:
# All filenames, sorted:
fnames_refs_indv = [f for f in os.listdir(dir_refs_indv) if f.endswith('.json')]
fnames_refs_indv.sort()

# Put filenames into pandas df, add columns for abs path and year:
refs_indv_df = pd.DataFrame({"fnames_refs_indv": fnames_refs_indv})
refs_indv_df["paths_refs_indv"] = refs_indv_df["fnames_refs_indv"].apply(lambda x: dir_refs_indv+x)
refs_indv_df["year"] = refs_indv_df["fnames_refs_indv"].apply(lambda x: x[0:4])

# Use pandas' groupby functionality to group by year then convert to list of lists:
refs_grouped = refs_indv_df.groupby("year")
years = list(refs_grouped.groups.keys())
paths_indv_grouped_list = [list(refs_grouped.get_group(y)["paths_refs_indv"]) for y in years]

This is parallelized on a local cluster. The reason being if we pass the paths of the individual reference files to VM's on a distributed cluster, those VM's don't seem to be able to resolve those paths (probably since they are with respect to this VM).

In [12]:
## Save reference JSONs in this directory:
dir_refs_yearly = './reference_files_yearly/'
!mkdir $dir_refs_yearly

mkdir: cannot create directory ‘./reference_files_yearly/’: File exists


In [14]:
## Small wrapper around kerchunk functionality. This is what will be parallelized:
def multizarrtozarr(paths, kwargs_mzz=None):
    mzz = MultiZarrToZarr(paths, **kwargs_mzz)
    return mzz.translate()

In [45]:
# Check how many cpu's are on this VM:
cpu_count = multiprocessing.cpu_count()
print("CPU count =", cpu_count)

CPU count = 4


In [46]:
# Start up cluster and print some information about it:
client = Client(n_workers=cpu_count, threads_per_worker=1)
print(client.cluster)
print("View any work being done on the cluster here", client.dashboard_link)

LocalCluster(5fef2dfd, 'tcp://127.0.0.1:40827', workers=4, threads=4, memory=14.83 GiB)
View any work being done on the cluster here https://cluster-mwgis.dask.host/jupyter/proxy/8787/status


In [49]:
%%time

# Setup parallel computations:
kwargs_mzz = {'remote_protocol':"s3", 'remote_options':fs.storage_options, 'concat_dims':["time"]}
multizarrtozarr_par = delayed(multizarrtozarr)
tasks = [multizarrtozarr_par(pl, kwargs_mzz=kwargs_mzz) for pl in paths_indv_grouped_list]

# Run parallel computations:
results = da.compute(*tasks)



CPU times: user 14.3 s, sys: 3.36 s, total: 17.6 s
Wall time: 5min 32s


In [51]:
## Save results as JSON's:
for ref, y in zip(results, years):
    name_ref = dir_refs_yearly + y + "-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.json"
    with open(name_ref, 'w') as outf:
        outf.write(ujson.dumps(ref))

### 1.3 Create total combined reference file in JSON

In [16]:
%%time

kwargs_mzz = {'remote_protocol':"s3", 'remote_options':fs.storage_options, 'concat_dims':["time"]}
paths_refs_yearly = [dir_refs_yearly+f for f in os.listdir(dir_refs_yearly) if f.endswith('.json')]
ref_total = multizarrtozarr(paths_refs_yearly, kwargs_mzz=kwargs_mzz)

with open("MUR-JPL-L4-GLOB-v4.1_combined-ref.json", 'w') as outf:
        outf.write(ujson.dumps(ref_total))

CPU times: user 29.5 s, sys: 2.98 s, total: 32.5 s
Wall time: 32.4 s


### 1.4 Create total combined reference file in PARQUET

In [4]:
%%time
# Save reference info to parquet:
ref_combined = json.load(open("MUR-JPL-L4-GLOB-v4.1_combined-ref.json"))
refs_to_dataframe(ref_combined, "MUR-JPL-L4-GLOB-v4.1_combined-ref.parq")

CPU times: user 18.6 s, sys: 1.79 s, total: 20.4 s
Wall time: 21.5 s


In [5]:
## Compare size of JSON vs parquet, printed in MB
    # JSON
print("JSON:", os.path.getsize("MUR-JPL-L4-GLOB-v4.1_combined-ref.json")/10**6, "MB")
    # parquet
size_parq = 0 
for path, dirs, files in os.walk("MUR-JPL-L4-GLOB-v4.1_combined-ref.parq"):
    for f in files:
        fp = os.path.join(path, f)
        size_parq += os.path.getsize(fp)
print("PARQUET:", size_parq/10**6, "MB")

JSON: 630.652399 MB
PARQUET: 19.531077 MB


### 1.5 Verify that the reference files work

In [4]:
def opendf_withref(ref, fs_data):
    """
    "ref" is a reference file or object. "fs_data" is a filesystem with credentials to
    access the actual data files. 
    """
    storage_opts = {"fo": ref, "remote_protocol": "s3", "remote_options": fs_data.storage_options}
    fs_ref = fsspec.filesystem('reference', **storage_opts)
    m = fs_ref.get_mapper('')
    data = xr.open_dataset(
        m, engine="zarr", chunks={},
        backend_kwargs={"consolidated": False}
        )
    return data

**JSON reference**

In [8]:
%%time
data = opendf_withref("MUR-JPL-L4-GLOB-v4.1_combined-ref.json", fs)

CPU times: user 8.26 s, sys: 1.06 s, total: 9.32 s
Wall time: 9.53 s


In [9]:
data

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.24 TiB 15.98 MiB Shape (3868, 17999, 36000) (1, 1023, 2047) Dask graph 1253232 chunks in 2 graph layers Data type float64 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.24 TiB 15.98 MiB Shape (3868, 17999, 36000) (1, 1023, 2047) Dask graph 1253232 chunks in 2 graph layers Data type float64 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.12 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 9.12 TiB 15.98 MiB Shape (3868, 17999, 36000) (1, 1447, 2895) Dask graph 653692 chunks in 2 graph layers Data type float32 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,9.12 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,31.96 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.24 TiB 31.96 MiB Shape (3868, 17999, 36000) (1, 1447, 2895) Dask graph 653692 chunks in 2 graph layers Data type float64 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,31.96 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [10]:
data["analysed_sst"]

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.24 TiB 15.98 MiB Shape (3868, 17999, 36000) (1, 1023, 2047) Dask graph 1253232 chunks in 2 graph layers Data type float64 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Compute a regional mean at several time stamps using the reference file, and confirm we get the same results when we use the corresponding native netCDF files.

In [8]:
def compare_ref_to_native(data_ref, t, lat_slice=(-45,45), lon_slice=(-10,10)):
    """
    Compares the MUR data set opened with the combined reference file to the corresponding native netCDF file.
    Does this by taking a single time stamp and computing the mean SST for a sub-region of the globe, first 
    using the data opened with the reference file. Then finds the netCDF file that corresponds to that time stamp,
    opens it, and computes the mean over that same region.
    """
    # Compute regional mean with reference file
    mean_ref = data_ref["analysed_sst"].sel(time=t, lat=slice(*lat_slice), lon=slice(*lon_slice)).mean().compute().item()

    # Open native netcdf file and compute region mean
    shortname = "MUR-JPL-L4-GLOB-v4.1"
    fobj = earthaccess.open(earthaccess.search_data(short_name=shortname, temporal=(str(t), str(t))))
    data_ncnative = xr.open_dataset(fobj[0])
    mean_ncnative = data_ncnative["analysed_sst"].sel(time=t, lat=slice(*lat_slice), lon=slice(*lon_slice)).mean().item()

    print(mean_ref)
    print(mean_ncnative)

    return

In [35]:
t1 = data["time"].isel(time=1).values
compare_ref_to_native(data, t1, lat_slice=(-45,45), lon_slice=(-10,10))

t2 = data["time"].isel(time=1000).values
compare_ref_to_native(data, t2, lat_slice=(-45,45), lon_slice=(-10,10))

t3 = data["time"].isel(time=3333).values
compare_ref_to_native(data, t3, lat_slice=(-45,45), lon_slice=(-10,10))

QUEUEING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1 [00:00<?, ?it/s]

294.1321070719369
294.1321070719374


QUEUEING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1 [00:00<?, ?it/s]

295.8193785030412
295.8193785030416


QUEUEING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1 [00:00<?, ?it/s]

292.9823902085813
292.9823902085815


**PARQUET reference**

In [5]:
%%time
data_parq = opendf_withref("MUR-JPL-L4-GLOB-v4.1_combined-ref.parq", fs)

CPU times: user 150 ms, sys: 20.1 ms, total: 170 ms
Wall time: 432 ms


In [6]:
data_parq

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.24 TiB 15.98 MiB Shape (3868, 17999, 36000) (1, 1023, 2047) Dask graph 1253232 chunks in 2 graph layers Data type float64 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.24 TiB 15.98 MiB Shape (3868, 17999, 36000) (1, 1023, 2047) Dask graph 1253232 chunks in 2 graph layers Data type float64 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1023, 2047)"
Dask graph,1253232 chunks in 2 graph layers,1253232 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.12 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 9.12 TiB 15.98 MiB Shape (3868, 17999, 36000) (1, 1447, 2895) Dask graph 653692 chunks in 2 graph layers Data type float32 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,9.12 TiB,15.98 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,31.96 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.24 TiB 31.96 MiB Shape (3868, 17999, 36000) (1, 1447, 2895) Dask graph 653692 chunks in 2 graph layers Data type float64 numpy.ndarray",36000  17999  3868,

Unnamed: 0,Array,Chunk
Bytes,18.24 TiB,31.96 MiB
Shape,"(3868, 17999, 36000)","(1, 1447, 2895)"
Dask graph,653692 chunks in 2 graph layers,653692 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [9]:
t1 = data_parq["time"].isel(time=1).values
compare_ref_to_native(data_parq, t1, lat_slice=(-45,45), lon_slice=(-10,10))

t2 = data_parq["time"].isel(time=1000).values
compare_ref_to_native(data_parq, t2, lat_slice=(-45,45), lon_slice=(-10,10))

t3 = data_parq["time"].isel(time=3333).values
compare_ref_to_native(data_parq, t3, lat_slice=(-45,45), lon_slice=(-10,10))

QUEUEING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1 [00:00<?, ?it/s]

294.1321070719369
294.1321070719374


QUEUEING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1 [00:00<?, ?it/s]

295.8193785030412
295.8193785030416


QUEUEING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1 [00:00<?, ?it/s]

292.9823902085813
292.9823902085815
