# Pangeo Forge Recipe: LIS NetCDF to Zarr Store

This notebook follows Pangeo Forge Recipe's [sequential NetCDF -> Zarr tutorial](https://pangeo-forge.readthedocs.io/en/latest/tutorials/netcdf_zarr_sequential.html) from the documentation to convert sequential NetCDF files to a consolidated Zarr store. We've modified the workflow described in the tutorial to read NetCDFs hosted on S3 and write the Zarr store back to S3.

**Important**: to prevent accidentally overwriting an existing Zarr store by executing this notebook we have commented out the final two cells and inserted a `'TEST'` into the target output path. If you wish to run this notebook yourself, be sure to fix these elements first.

In [55]:
# S3 filesystem, AWS SDK (boto3), tempfile utility module, and logging
import s3fs, boto3, tempfile, logging

# progress bar in notebook
from tqdm.notebook import tqdm

# local filesystem utility
from fsspec.implementations.local import LocalFileSystem

# pangeo forge recipe classes and functions
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from pangeo_forge_recipes.storage import FSSpecTarget, CacheFSSpecTarget, MetadataTarget, StorageConfig

# numpy, pandas, xarray
import numpy as np
import pandas as pd
import xarray as xr

s3 = s3fs.S3FileSystem(anon=False)

# define bucket name
bucket_name = 'eis-dh-hydro'

s3_boto = boto3.resource('s3')
bucket = s3_boto.Bucket('eis-dh-hydro')

Set `debug = True` below to enable logging:

In [2]:
debug = False

if debug:
    logger = logging.getLogger('pangeo_forge_recipes')
    formatter = logging.Formatter('%(name)s:%(levelname)s - %(message)s')
    handler = logging.StreamHandler()
    handler.setLevel(logging.INFO)
    handler.setFormatter(formatter)
    logger.setLevel(logging.INFO)
    logger.addHandler(handler)

Glob a list of input file URLs:

In [3]:
# define filesystem protocol
protocol = 's3://'

# define input directory within bucket
netcdf_dir = 'LIS_NETCDF'
ds_dir = 'DELTA_2km/9CONST_RA_LAKE/ROUTING'

# define url pattern
url_pattern = protocol + '/'.join([bucket_name, netcdf_dir, ds_dir, '**/LIS_HIST*.nc'])

# build input urls
input_urls = [protocol + s for s in s3.glob(url_pattern)]

# inspect a url
input_urls[0]

's3://eis-dh-hydro/LIS_NETCDF/DELTA_2km/9CONST_RA_LAKE/ROUTING/199201/LIS_HIST_199201010000.d01.nc'

Create a Pangeo Forge `pattern`:

In [4]:
# define recipe file pattern
pattern = pattern_from_file_sequence(input_urls, 'time', nitems_per_file=1)

pattern

<FilePattern {'time': 10594}>

Inspect the data in the pattern:

In [5]:
# pattern is designed to be iterated over, so get the first key:
for key in pattern:
    break
key

(DimIndex(name='time', index=0, sequence_len=10594, operation=<CombineOp.CONCAT: 2>))

Inspect the pattern key:

In [6]:
pattern[key]

's3://eis-dh-hydro/LIS_NETCDF/DELTA_2km/9CONST_RA_LAKE/ROUTING/199201/LIS_HIST_199201010000.d01.nc'

Here we define the chunking scheme for the target Zarr store. Chunks should ideally be between 10-100mb. Let's inspect the NetCDF filesizes to determine how we should chunk the Zarr store:

In [41]:
obj_prefix = 'LIS_NETCDF/DELTA_2km/9CONST_RA_LAKE/ROUTING/'

nc_filesizes = []

# get size of each object in bucket that matches obj_prefix pattern
for obj in bucket.objects.filter(Prefix=obj_prefix):
    nc_filesizes.append(obj.size)

In [41]:
# convert to dataframe for quick stat summary
nc_filesizes_df = pd.DataFrame({'filesize_kb': nc_filesizes})
nc_filesizes_df['filesize_mb'] = nc_filesizes_df['filesize_kb'] / 1e6

nc_filesizes_df.describe()

Unnamed: 0,filesize_kb,filesize_mb
count,10594.0,10594.0
mean,4650462.0,4.650462
std,0.0,3.615057e-13
min,4650462.0,4.650462
25%,4650462.0,4.650462
50%,4650462.0,4.650462
75%,4650462.0,4.650462
max,4650462.0,4.650462


Since all of the files are about 4.65MB we'll chunk the time dimension in groups of 365 to create chunks about 100mb in size. (Note: we don't really need to define our target chunks like this because we are using a simple chunking strategy that aligns with the concatenation dimension (`time`) we specified when creating the `FilePattern` object above. Instead we can simply set `input_per_chunks` to the desired chunk size and Pangeo Forge will handle the rest. [See the docs for more.](https://pangeo-forge.readthedocs.io/en/latest/recipe_user_guide/recipes.html#api-documentation))

In [46]:
target_chunks = {'time': 365}

Next we define the location of our temporary cache, metadata cache, and output target.

In [56]:
##### Create storage configuration for target and caches #####
    
def create_storage_config():

    # define local FS object
    fs_local = LocalFileSystem()

    # create cache FS object (uses temp_dir passed in from config)
    fs_temp = CacheFSSpecTarget(fs_local, temp_dir)

    # create target FS object
    target_url = build_url(target_path)
    fs_target = FSSpecTarget(fs=s3, root_path=target_url)

    # create metadata target path and FS object
    meta_dir = tempfile.TemporaryDirectory(dir=temp_dir)
    fs_meta = MetadataTarget(fs_local, meta_dir.name)

    # create storage configuration for target and caches
    return StorageConfig(fs_target, fs_temp, fs_meta)

XarrayZarrRecipe(file_pattern=<FilePattern {'time': 10594}>, inputs_per_chunk=10, target_chunks={'time': 10}, target=FSSpecTarget(fs=<s3fs.core.S3FileSystem object at 0x7fec140ee280>, root_path='eis-dh-hydro/TEST/LIS/DELTA_2km/9CONST_RA_LAKE/ROUTING/LIS_HIST.d01.zarr'), input_cache=None, metadata_cache=MetadataTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7febcd2b6640>, root_path='/home/jovyan/efs/tmp/tmpwc9fee4x'), cache_inputs=False, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=<function preprocess at 0x7febd5963280>, lock_timeout=None, subset_inputs={}, is_opendap=False)

Define preprocesing functions:

In [None]:
# define LIS output specific preprocessing func
def add_latlon_coords(ds: xr.Dataset)->xr.Dataset:
    """
    Adds lat/lon as dimensions and coordinates to an xarray.Dataset object.

    LIS NetCDF output contains lat/lon as variables (2D arrays) and logical indices
    for the grid in coordinates named 'east_west' and 'north_south'.

    It is not possible to assign the lat/lon values as coordinates directly because
    LIS outputs may contain masked areas where the lat/lon values are
    set to a nodata value and xarray does not allow use of a coordinates
    that contain NaN values. Instead, we build the coordinate fields based
    on the grid description in the NetCDF metadata. It ain't pretty, but it works.
    """

    # get attributes from dataset
    attrs = ds.attrs

    # get x and y resolution from metadata
    dx = round(float(attrs['DX']), 3)
    dy = round(float(attrs['DY']), 3)

    # get number of grid cells in x, y dimensions from metadata
    ew_len = len(ds['east_west'])
    ns_len = len(ds['north_south'])

    # get lower-left lat and lon from metadata
    ll_lat = round(float(attrs['SOUTH_WEST_CORNER_LAT']), 3)
    ll_lon = round(float(attrs['SOUTH_WEST_CORNER_LON']), 3)

    # calculate upper-right lat and lon
    ur_lat =  ll_lat + (dy * ns_len)
    ur_lon = ll_lon + (dx * ew_len)

    # define the new coordinates
    coords = {
        # create arrays containing the lat/lon at each gridcell
        'lat': np.linspace(ll_lat, ur_lat, ns_len, dtype=np.float32, endpoint=False),
        'lon': np.linspace(ll_lon, ur_lon, ew_len, dtype=np.float32, endpoint=False)
    }

    # rename the original lat and lon variables to preserve them
    ds = ds.rename({'lon':'orig_lon', 'lat':'orig_lat'})
    
    # rename the grid dimensions to lat and lon
    ds = ds.rename({'north_south': 'lat', 'east_west': 'lon'})

    # assign the coords above as coordinates and add original metadata
    ds = ds.assign_coords(coords)
    ds.lon.attrs = ds.orig_lon.attrs
    ds.lat.attrs = ds.orig_lat.attrs

    return ds

# define preprocessing function

def preprocess(ds: xr.Dataset) -> xr.Dataset:
    
    ds = add_latlon_coords(ds)
    
    return ds

Create the `XarrayZarrRecipe` object:

In [None]:
recipe = XarrayZarrRecipe(pattern,                             # file URL pattern
                            inputs_per_chunk=inputs_per_chunk, # input files per chunk
                            storage_config=create_storage_config(),     # storage configuration for target and caches
                            process_chunk=preprocess,          # preprocess func
                            cache_inputs=False,                # read inputs directly from S3
                            target_chunks=target_chunks)       # set chunking scheme for output

Inspect the recipe:

In [None]:
recipe

XarrayZarrRecipe(file_pattern=<FilePattern {'time': 10594}>, inputs_per_chunk=10, target_chunks={'time': 10}, target=None, input_cache=None, metadata_cache=None, cache_inputs=False, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=<function preprocess at 0x7febd5963280>, lock_timeout=None, subset_inputs={}, is_opendap=False)

Check that the recipe contains all of our inputs:

In [None]:
all_inputs = list(recipe.iter_inputs())
len(all_inputs)

10594

Check the number of chunks that will be created (10594/365 ~= 29):

In [None]:
all_chunks = list(recipe.iter_chunks())
len(all_chunks)

1060

The cell below will prepare our target location by creating a template Zarr store that matches our chunking strategy.

In [21]:
#recipe.prepare_target()

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  return xr.open_zarr(target.get_mapper())


The cell below actually executes the recipe to perform the conversion. Lastly, the recipe is finalized (metadata consolidation?).

In [23]:
# for chunk in tqdm(recipe.iter_chunks(), total=len(all_chunks)):
#     recipe.store_chunk(chunk)
    
# recipe.finalize_target()

  0%|          | 0/106 [00:00<?, ?it/s]

Once the conversion has finished, we check that the Zarr store exists and has the expected structure:

In [60]:
# test_ds = xr.open_zarr(s3.get_mapper('s3://' + target_path), consolidated=True)
test_ds = xr.open_zarr(s3.get_mapper('s3://' + '/'.join([bucket_name, zarr_dir, ds_dir, zarr_name])), consolidated=True)

In [61]:
test_ds

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray


In [62]:
test_ds.chunks

Frozen({'time': (10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10,

In [63]:
test_ds['FloodStor_tavg']

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4.40 GB 4.16 MB Shape (10594, 270, 385) (10, 270, 385) Count 1061 Tasks 1060 Chunks Type float32 numpy.ndarray",385  270  10594,

Unnamed: 0,Array,Chunk
Bytes,4.40 GB,4.16 MB
Shape,"(10594, 270, 385)","(10, 270, 385)"
Count,1061 Tasks,1060 Chunks
Type,float32,numpy.ndarray
