# VirtualiZarr Useful Recipes with NASA Earthdata

#### *Author: Dean Henze, PO.DAAC*

*Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise, does not constitute or imply its endorsement by the United States Government or the Jet Propulsion Laboratory, California Institute of Technology.*

## Summary

This notebook goes through several functionalities of the VirtualiZarr package to create virtual reference files, specifically using it with NASA Earthdata and utilizing the `earthaccess` package. It is meant to be a quick-start reference that introduces some key capabilities / characteristics of the package once a user has a high-level understanding of virtual data sets and the cloud-computing challenges they address (see references in the *Prerequisite knowledge* section below). In short, VirtualiZarr is a Python package to create "reference files", which can be thought of as road maps for the computer to efficiently navigate through large arrays in a single data file, or across many files. Once a reference file for a data set is created, utilizing it to open the data can speed up several processes including lazy loading, accessing subsets, and in some cases performing computations. Importantly, one can create a combined reference for all the files in a dataset and use it to lazy load / access the entire record at once.

The functionalities of VirtualiZarr (with earthaccess) covered in this notebook are:

1. **Getting Data File endpoints in Earthdata Cloud** which are needed for virtualizarr to create reference files.
2. **Generating reference files for 1 day, 1 year, and the entire record of a ~750 GB data set**. The data set used is the Level 4 global gridded 6-hourly wind product from the Cross-Calibrated Multi-Platform project (https://doi.org/10.5067/CCMP-6HW10M-L4V31), available on PO.DAAC. This section also covers speeding up the reference creation using parallel computing. Reference files are saved in both JSON and PARQUET formats. The latter is an important format as it reduces the reference file size by ~30x in our tests. *Saving in ice chunk formats will be tested / covered in the coming months.*
3. **Combining reference files (in progress)**. The ability to combine reference files together is valuable, for example to upate reference files for forward-streaming datasets when new data are available, without re-creating the entire record from scratch. However, with the current workflows and version of VirtualiZarr, this is not possible due to our use of a specific kwarg when creating the reference files. The workflow is still included here (with errors) because it is anticipated that this will be fixed in upcoming versions. Alternately, the use of ice chunk will also likely solve this issue (ice chunk functionality to be tested soon). 

## Requirements, prerequisite knowledge, learning outcomes

#### Requirements to run this notebook

* Earthdata login account: An Earthdata Login account is required to access data from the NASA Earthdata system. Please visit https://urs.earthdata.nasa.gov to register and manage your Earthdata Login account.

* Compute environment: This notebook is meant to be run in the cloud (AWS instance running in us-west-2). We used an `m6i.4xlarge` EC2 instance (16 CPU's, 64 GiB memory) for the parallel computing sections. At minimum we recommend a VM with 10 CPU's to make the parallel computations in Section 2.2.1 faster.

* Optional Coiled account: To run the section on distributed clusters, Create a coiled account (free to sign up), and connect it to an AWS account. For more information on Coiled, setting up an account, and connecting it to an AWS account, see their website [https://www.coiled.io](https://www.coiled.io). 

#### Prerequisite knowledge

* This notebook covers virtualizarr functionality but does not present the high-level ideas behind it. For an understanding of reference files and how they are meant to enhance in-cloud access to file formats that are not cloud optimized (such netCDF, HDF), please see e.g. this [kerchunk page](https://fsspec.github.io/kerchunk/), or [this page on virtualizarr](https://virtualizarr.readthedocs.io/en/latest/).

* Familiarity with the `earthaccess` and `Xarray` packages. Familiarity with directly accessing NASA Earthdata in the cloud. 

* The Cookbook notebook on [Dask basics](https://podaac.github.io/tutorials/notebooks/Advanced_cloud/basic_dask.html) is handy for those new to parallel computating.

#### Learning Outcomes

This notebook serves both as a pedagogical resource for learning several key workflows as well as a quick reference guide. Readers will gain the understanding to combine the virtualizarr and earthaccess packages to create virtual dataset reference files for NASA Earthdata.

## Import Packages
#### ***Note Zarr Version***
***Zarr version 2 is needed for the current implementation of this notebook, due to (as of February 2025) Zarr version 3 not accepting `FSMap` objects.***

We ran this notebook in a Python 3.12 environment. The minimal working environment we used to run this notebook was:
```
zarr==2.18.4
fastparquet==2024.5.0
xarray==2025.1.2
earthaccess==0.11.0
fsspec==2024.10.0
dask==2024.5.2 ("dask[complete]"==2024.5.2 if using pip)
h5netcdf==1.3.0
matplotlib==3.9.2
jupyterlab
jupyter-server-proxy
virtualizarr==1.3.0
kerchunk==0.2.7
```
And optionally:
```
coiled==1.58.0
```

In [2]:
pip install earthaccess>=0.11.0 zarr>=2.18.4 fastparquet>=2024.5.0 xarray>=2024.1.0  fsspec>=2024.10.0 "dask[complete]">=2024.5.2 h5netcdf>=1.3.0 ujson>=5.10.0 matplotlib>=3.9.2  kerchunk==0.2.7 virtualizarr>=1.3.0 

Note: you may need to restart the kernel to use updated packages.


In [2]:
#!pip list 

In [3]:
# Built-in packages
import os
import sys

# Filesystem management 
import fsspec
import earthaccess

# Data handling
import xarray as xr
from virtualizarr import open_virtual_dataset

# Parallel computing 
import multiprocessing
from dask import delayed
import dask.array as da
from dask.distributed import Client

# parquet zip file
import shutil

# Other
import matplotlib.pyplot as plt

## Other Setup

In [4]:
xr.set_options( # display options for xarray objects
    display_expand_attrs=False,
    display_expand_coords=True,
    display_expand_data=True,
)

<xarray.core.options.set_options at 0x7f8a0fdceae0>

## 1. Get Data File S3 endpoints in Earthdata Cloud 
The first step is to find the S3 endpoints to the files. Handling access credentials to Earthdata and then finding the endpoints can be done a number of ways (e.g. using the `requests`, `s3fs` packages) but we use the `earthaccess` package for its ease of use. We get the endpoints for all files in the CCMP record.

In [5]:
# Get Earthdata creds
earthaccess.login()

Enter your Earthdata Login username:  edward.m.armstrong
Enter your Earthdata password:  ········


<earthaccess.auth.Auth at 0x7f89f515c770>

In [6]:
# Get AWS creds. Note that if you spend more than 1 hour in the notebook, you may have to re-run this line!!!
fs = earthaccess.get_s3_filesystem(daac="PODAAC")

In [7]:
%%time

# Locate OSTIA REProcessed  dataset file metadata:
granule_info = earthaccess.search_data(
    short_name="OSTIA-UKMO-L4-GLOB-REP-v2.0",
    provider="POCLOUD",
    temporal=("1982-01-01", "2023-12-31")
    )

CPU times: user 375 ms, sys: 76.6 ms, total: 452 ms
Wall time: 8.23 s


In [8]:
# Extract S3 endpoints for all files from metadata:
data_s3links = [g.data_links(access="direct")[0] for g in granule_info]
print(data_s3links[0:3])
print(len(data_s3links))

['s3://podaac-ops-cumulus-protected/OSTIA-UKMO-L4-GLOB-REP-v2.0/1982/001/19820101120000-UKMO-L4_GHRSST-SSTfnd-OSTIA-GLOB_REP-v02.0-fv02.0.nc', 's3://podaac-ops-cumulus-protected/OSTIA-UKMO-L4-GLOB-REP-v2.0/1982/002/19820102120000-UKMO-L4_GHRSST-SSTfnd-OSTIA-GLOB_REP-v02.0-fv02.0.nc', 's3://podaac-ops-cumulus-protected/OSTIA-UKMO-L4-GLOB-REP-v2.0/1982/003/19820103120000-UKMO-L4_GHRSST-SSTfnd-OSTIA-GLOB_REP-v02.0-fv02.0.nc']
15340


## 2. Generate reference files for the OSTIA REP dataset

### 2.1 Try just the first day


***Important***

The kwarg `loadable_variables` is not mandatory to create a viable reference file, but will become important for rapid lazy loading when working with large combined reference files. Assign to this at minimum the list of 1D coordinate variable names for the data set (additional 1D or scalar vars can also be added). This functionality will be the default in future releases of virtualizarr.

In [9]:
# This will be assigned to 'loadable_variables' and needs to be modified per the specific 
# coord names of the data set:
coord_vars = ["lat","lon","time"]

In [10]:
%%time
reader_opts = {"storage_options": fs.storage_options} # S3 filesystem creds from previous section.

# Create reference for the first data file:
virtual_ds_example = open_virtual_dataset(
    data_s3links[0], indexes={}, 
    reader_options=reader_opts, loadable_variables=coord_vars
    )
print(virtual_ds_example)

<xarray.Dataset> Size: 648MB
Dimensions:           (time: 1, lat: 3600, lon: 7200)
Coordinates:
  * time              (time) datetime64[ns] 8B 1982-01-01T12:00:00
  * lat               (lat) float32 14kB -89.97 -89.93 -89.88 ... 89.93 89.97
  * lon               (lon) float32 29kB -180.0 -179.9 -179.9 ... 179.9 180.0
Data variables:
    analysed_sst      (time, lat, lon) float64 207MB ManifestArray<shape=(1, ...
    analysis_error    (time, lat, lon) float64 207MB ManifestArray<shape=(1, ...
    sea_ice_fraction  (time, lat, lon) float64 207MB ManifestArray<shape=(1, ...
    mask              (time, lat, lon) int8 26MB ManifestArray<shape=(1, 3600...
Attributes: (47)
CPU times: user 273 ms, sys: 47.9 ms, total: 321 ms
Wall time: 999 ms


The reference can be saved to file and used to open the corresponding CCMP data file with Xarray:

In [11]:
virtual_ds_example.virtualize.to_kerchunk('virtual_ds_example.json', format='json')

In [12]:
# Open data using the reference file, using a small wrapper function around xarray's open_dataset. 
# This will shorten code blocks in other sections. 
def opends_withref(ref, fs_data):
    """
    "ref" is a reference file or object. "fs_data" is a filesystem with credentials to
    access the actual data files. 
    """
    storage_opts = {"fo": ref, "remote_protocol": "s3", "remote_options": fs_data.storage_options}
    fs_ref = fsspec.filesystem('reference', **storage_opts)
    m = fs_ref.get_mapper('')
    data = xr.open_dataset(
        m, engine="zarr", chunks={},
        backend_kwargs={"consolidated": False}
    )
    return data

In [13]:
data_example = opends_withref('virtual_ds_example.json', fs)
print(data_example)

<xarray.Dataset> Size: 726MB
Dimensions:           (time: 1, lat: 3600, lon: 7200)
Coordinates:
  * lat               (lat) float32 14kB -89.97 -89.93 -89.88 ... 89.93 89.97
  * lon               (lon) float32 29kB -180.0 -179.9 -179.9 ... 179.9 180.0
  * time              (time) datetime64[ns] 8B 1982-01-01T12:00:00
Data variables:
    analysed_sst      (time, lat, lon) float64 207MB dask.array<chunksize=(1, 1200, 2400), meta=np.ndarray>
    analysis_error    (time, lat, lon) float64 207MB dask.array<chunksize=(1, 1200, 2400), meta=np.ndarray>
    mask              (time, lat, lon) float32 104MB dask.array<chunksize=(1, 1800, 3600), meta=np.ndarray>
    sea_ice_fraction  (time, lat, lon) float64 207MB dask.array<chunksize=(1, 1800, 3600), meta=np.ndarray>
Attributes: (47)


In [14]:
# Also useful to note, these reference objects don't take much memory:
print(sys.getsizeof(virtual_ds_example), "bytes")

120 bytes


### 2.2 Now do the entire record 
Reference information for each data file in the entire record is created individually in memory, and then the combined reference file for the record can be created.

For us, reference file creation for a single file takes about 0.7 seconds, so processing a year of files would take about 4.25 minuts. One can easly accomplish this with a for-loop:

```
virtual_ds_list = [
    open_virtual_dataset(
        p, indexes={},
        reader_options={"storage_options": fs.storage_options},
        loadable_variables=coord_vars
        )
    for p in data_s3links
    ]
```

However, we speed things up using basic parallel computing. 

### 2.2.1 Method 1: parallelize using Dask local cluster
If using an `m6i.8xlarge` AWS EC2 instance, there are 32 CPUs available and each should have enough memory to utilize all at once. If working on a different VM-type, change the `n_workers` in the call to `Client()` below as needed.

In [15]:
# Check how many cpu's are on this VM:
print("CPU count =", multiprocessing.cpu_count())

CPU count = 32


In [16]:
# Start up cluster and print some information about it:
client = Client(n_workers=15, threads_per_worker=1)
print(client.cluster)
print("View any work being done on the cluster here", client.dashboard_link)

LocalCluster(f5360d3e, 'tcp://127.0.0.1:41887', workers=15, threads=15, memory=122.31 GiB)
View any work being done on the cluster here https://cluster-dwzfu.dask.host/jupyter/proxy/8787/status


In [17]:
%%time
# Create individual references in memory (this may take a while):
open_vds_par = delayed(open_virtual_dataset)
tasks = [
    open_vds_par(p, indexes={}, reader_options=reader_opts, loadable_variables=coord_vars) 
    for p in data_s3links[:] # all files !
    ]
virtual_ds_list = list(da.compute(*tasks)) # The xr.combine_nested() function below needs a list rather than a tuple.

CPU times: user 3min 21s, sys: 27.9 s, total: 3min 49s
Wall time: 19min 50s


Using the individual references to create the combined reference is fast and does not requre parallel computing.

In [18]:
%%time
# Create the combined reference
virtual_ds_combined = xr.combine_nested(virtual_ds_list, concat_dim='time', coords='minimal', compat='override', combine_attrs='drop_conflicts')

CPU times: user 11.3 s, sys: 438 ms, total: 11.7 s
Wall time: 11.4 s


In [23]:
# Save in JSON or PARQUET format:
fname_combined_json = 'OSTIA-UKMO-L4-GLOB-REP-v2.0_combined-ref.json'
fname_combined_parq = 'OSTIA-UKMO-L4-GLOB-REP-v2.0_combined-ref.parq'
virtual_ds_combined.virtualize.to_kerchunk(fname_combined_json, format='json')
virtual_ds_combined.virtualize.to_kerchunk(fname_combined_parq, format='parquet')

In [24]:
%%time
# Test lazy loading of the combine reference file JSON:
data_json = opends_withref(fname_combined_json, fs)
print(data_json)

<xarray.Dataset> Size: 11TB
Dimensions:           (time: 15340, lat: 3600, lon: 7200)
Coordinates:
  * lat               (lat) float32 14kB -89.97 -89.93 -89.88 ... 89.93 89.97
  * lon               (lon) float32 29kB -180.0 -179.9 -179.9 ... 179.9 180.0
  * time              (time) datetime64[ns] 123kB 1982-01-01T12:00:00 ... 202...
Data variables:
    analysed_sst      (time, lat, lon) float64 3TB dask.array<chunksize=(1, 1200, 2400), meta=np.ndarray>
    analysis_error    (time, lat, lon) float64 3TB dask.array<chunksize=(1, 1200, 2400), meta=np.ndarray>
    mask              (time, lat, lon) float32 2TB dask.array<chunksize=(1, 1800, 3600), meta=np.ndarray>
    sea_ice_fraction  (time, lat, lon) float64 3TB dask.array<chunksize=(1, 1800, 3600), meta=np.ndarray>
Attributes: (39)
CPU times: user 3.04 s, sys: 261 ms, total: 3.3 s
Wall time: 3.2 s


In [25]:
%%time
# Test lazy loading of the combine reference file PARQUET:
data_parq = opends_withref(fname_combined_parq, fs)
print(data_parq)

<xarray.Dataset> Size: 11TB
Dimensions:           (time: 15340, lat: 3600, lon: 7200)
Coordinates:
  * lat               (lat) float32 14kB -89.97 -89.93 -89.88 ... 89.93 89.97
  * lon               (lon) float32 29kB -180.0 -179.9 -179.9 ... 179.9 180.0
  * time              (time) datetime64[ns] 123kB 1982-01-01T12:00:00 ... 202...
Data variables:
    analysed_sst      (time, lat, lon) float64 3TB dask.array<chunksize=(1, 1200, 2400), meta=np.ndarray>
    analysis_error    (time, lat, lon) float64 3TB dask.array<chunksize=(1, 1200, 2400), meta=np.ndarray>
    mask              (time, lat, lon) float32 2TB dask.array<chunksize=(1, 1800, 3600), meta=np.ndarray>
    sea_ice_fraction  (time, lat, lon) float64 3TB dask.array<chunksize=(1, 1800, 3600), meta=np.ndarray>
Attributes: (39)
CPU times: user 41.5 ms, sys: 3.55 ms, total: 45.1 ms
Wall time: 43.1 ms


In [26]:
# write out the zipped  parquet directory  

# Zip PARQUET directory
shutil.make_archive(fname_combined_parq, 'zip', fname_combined_parq)

'/scratch/notebook/ref_combined_OSTIA-UKMO-L4-GLOB-REP-v2.0.parq.zip'

In [None]:
open_vds_par.cluster.shutdown()

Using the individual references to create the combined reference is fast and does not requre parallel computing.

For us, lazy loading the entire record took ~3 seconds. Compare that to an attempt at opening these same files with `Xarray` the "traditional" way with a call to `xr.open_mfdataset()`. On a smaller machine, the following line of code will either fail or take a long (possibly very long) amount of time:

In [None]:
## You can try un-commenting and running this but your notebook will probably stall or crash:
# fobjs = earthaccess.open(granule_info)
# data = xr.open_mfdataset(fobjs[:])