# 📚 How to Build an Intake-ESM Catalog  

**Author:** Nicole Keeney  
**Creation Date:** March 2025  
**Last Modified:** N/A  

## 📖 Overview  
This notebook provides a step-by-step guide to building an **Intake-ESM** catalog for zarrs in an s3 bucket, including how to use `ecgtools` to create a custom parser and structure datasets for use with `intake-esm`.   

This notebook is divided into four steps: 
1) Build a custom parser function 
2) Build the catalog object 
3) Export the catalog files
4) Read in some files and verify that it all worked

In [2]:
import pandas as pd
import s3fs 
import traceback
from tqdm import tqdm
import intake 
import xarray as xr 
from ecgtools import Builder
from ecgtools.builder import INVALID_ASSET, TRACEBACK

  warn("Couldn't import ipywidgets properly, progress bar will use console behavior")


## Step 1: Build a custom parser function 
This function extracts information from the filepath, such that it can be used by the `Builder` to generate the function. For the input filepaths, the path to the zarr will include the `.zmetadata` extension, even though the `path` key in the output dictionary will **not** include this file extension; I realize this is confusing, but it's a hacky way to get around some inflexibility in the `Builder` class when working with zarrs. See the section above for more info: **Additional notes on the inputs to Builder: include_patterns**.

In [3]:
def parse_ae_ren_data(filepath):
    """
    Parses the S3 filepath to extract metadata for climate simulation data.
    
    Extracts information like installation, simulation model, experiment, 
    frequency, variable, grid resolution, and the file path (without `.zmetadata`).

    Parameters
    ----------
    filepath : str
        The S3 URL of the file.

    Returns
    -------
    dict
        A dictionary with parsed metadata:
        - installation, activity_id, institution_id, source_id, experiment_id, 
          table_id, variable_id, grid_label, path.
        If parsing fails, returns a dictionary with the error details:
        - INVALID_ASSET and TRACEBACK.

    Example
    -------
    >>> parse_ae_ren_data('s3://wfclimres/ERA/WRF/EC-Earth3/experiment/precipitation/variable/zarr/file.zmetadata')
    {
        "installation": "WRF",
        "activity_id": "WRF",
        "institution_id": "ERA",
        "source_id": "EC-Earth3",
        "experiment_id": "experiment",
        "table_id": "precipitation",
        "variable_id": "variable",
        "grid_label": "zarr",
        "path": "s3://wfclimres/ERA/WRF/EC-Earth3/experiment/precipitation/variable/zarr/file"
    }

    Notes
    -----
    The `try/except` block handles errors in extracting information from the `filepath`. 
    If the filepath structure does not match the expected format or if any error occurs 
    while splitting the string, the `except` block will capture the exception and return 
    a dictionary with the error message and traceback.
    """
    try:
        # Get the data info from the filepath
        institution_id, installation, source_id, experiment_id, table_id, variable_id, grid_label, _ = filepath.split("s3://wfclimres/")[1].split("/")
        # Remove .zmetadata from the filepath, since the actual path to the zarr doesn't include this 
        filepath = filepath.split(".zmetadata")[0]
    except Exception as e:
        # If an error occurs (e.g., wrong filepath structure), return error details
        return {INVALID_ASSET: filepath, TRACEBACK: traceback.format_exc()}
    
    # Simulation string mapping
    simulation_dict = {
        "ec-earth3": "EC-Earth3",
        "mpi-esm1-2-hr": "MPI-ESM1-2-HR",
        "miroc6": "MIROC6",
        "taiesm1": "TaiESM1",
        "era5": "ERA5"
    }

    # Add filepath info to dictionary
    info = {
        "installation": installation,
        "activity_id": "WRF", 
        "institution_id": "ERA",
        "source_id": simulation_dict[source_id],
        "experiment_id": experiment_id,
        "table_id": table_id,
        "variable_id": variable_id,
        "grid_label": grid_label,
        "path": filepath
    }
    
    return info


## Step 2: Build the catalog object
Using the custom parser, we will create our Builder object and build the catalog. How this is coded up will depend on various inputs. I'll show some different methods below.<br><br>
Each method will use some variation of the following code: 
```python 
# Base Builder object 
b = Builder(paths=["s3://path-to-data-directory"]) 

# Build the catalog using a custom parsing function (you need to define this function for your unique data structure)
b.build(parsing_func=custom_parsing_func)

# Exclude invalid assets and removing duplicate entries
b.clean_dataframe()

# View your build catalog as a dataframe :) 
b.df
```


### Method 1: Feed the Builder allllll the filepaths (no crawling required)
The Builder won't do any crawling of your data bucket, because you've oh so kindly fed it all the filepaths it needs. This method requires **you** to do the crawling beforehand to generate a list of these filepaths. 

Below, I've written a bunch of code to crawl through the renewables s3 bucket and get filepaths for all the files I want to include in the catalog

In [4]:
fs = s3fs.S3FileSystem()

# Use these to filter the s3 bucket 
installations = ["pv_distributed", "pv_utility", "windpower_offshore", "windpower_onshore"]
source_ids = ["ec-earth3", "miroc6", "mpi-esm1-2-hr", "taiesm1", "era5"]

# Total iterations for tqdm
total_iterations = len(installations) * len(source_ids)

filepaths = [] # Store all filepaths here 
with tqdm(total=total_iterations, desc="Scanning S3", unit="query") as pbar:
    for installation in installations:
        for source_id in source_ids:
            # I think each unique zarr store has a single .zmetadata file associated with it 
            # Use .zmetadata to grab path to the main zarr store 
            # Otherwise you get all the random stuff associated with it (variables, coords, etc) since zarr is a directory, not a single file 
            glob_s3 = fs.glob(f"s3://wfclimres/era/{installation}/{source_id}/**/*.zmetadata")
            zarr_paths = ["s3://"+file.split(".zmetadata")[0] for file in glob_s3] # Remove .zmetadata from the path 
            filepaths += zarr_paths 
            pbar.update(1)  # Update progress bar

print(f"Total files found: {len(filepaths)}")

Scanning S3: 100%|██████████| 20/20 [00:46<00:00,  2.30s/query]

Total files found: 112





Next, feed these filepaths to the Builder. Since these filepaths are already **absolute filepaths**, set the argument ``depth=0``: no crawling required. 

In [5]:
b1 = Builder(paths=filepaths, depth=0, include_patterns=["**/.zmetadata"])
b1.build(parsing_func=parse_ae_ren_data)
b1.clean_dataframe()
b1.df

Unnamed: 0,installation,activity_id,institution_id,source_id,experiment_id,table_id,variable_id,grid_label,path
0,pv_distributed,WRF,ERA,EC-Earth3,historical,1hr,cf,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
1,pv_distributed,WRF,ERA,EC-Earth3,historical,1hr,gen,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
2,pv_distributed,WRF,ERA,EC-Earth3,historical,day,cf,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
3,pv_distributed,WRF,ERA,EC-Earth3,historical,day,gen,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
4,pv_distributed,WRF,ERA,EC-Earth3,ssp370,1hr,cf,d03,s3://wfclimres/era/pv_distributed/ec-earth3/ss...
...,...,...,...,...,...,...,...,...,...
107,windpower_onshore,WRF,ERA,MPI-ESM1-2-HR,ssp370,1hr,gen,d03,s3://wfclimres/era/windpower_onshore/mpi-esm1-...
108,windpower_onshore,WRF,ERA,TaiESM1,historical,1hr,cf,d03,s3://wfclimres/era/windpower_onshore/taiesm1/h...
109,windpower_onshore,WRF,ERA,TaiESM1,historical,1hr,gen,d03,s3://wfclimres/era/windpower_onshore/taiesm1/h...
110,windpower_onshore,WRF,ERA,TaiESM1,ssp370,1hr,cf,d03,s3://wfclimres/era/windpower_onshore/taiesm1/s...


### Method 2: Feed the Builder a directory containing your files 
In this method, this Builder will do some of the hard work for you by crawling through your directories looking for files matching your specifications. Thanks, Builder!<br><br>

#### Notes on the "depth" Builder input
In this case, the path to a file in our directory looks like this: `"s3://wfclimres/era/pv_distributed/ec-earth3/historical/1hr/cf/d03/"`<br>
But, the path we are giving Builder looks like this: `"s3://wfclimres/era/pv_distributed/"`

Thus, the `depth` for this Builder would  be `5`: We need to crawl through 5 different directories (`"ec-earth3/historical/1hr/cf/d03/"`) beyond the root directory to finally reach our zarr store. 

In [6]:
root_dir = 's3://wfclimres/era/'
installations = ["pv_distributed", "pv_utility", "windpower_offshore", "windpower_onshore"]
exclude_patterns = [
    "**/EC-Earth3/**", 
    "**/ERA5/**", 
    "**/MIROC6/**", 
    "**/MPI-ESM1-2-HR/**", 
    "**TaiESM1/**"
    ]
b2 = Builder(
    paths=[f's3://wfclimres/era/{installation}/' for installation in installations], 
    depth=5, 
    exclude_patterns=exclude_patterns, 
    include_patterns=["**/.zmetadata"]
)
b2.build(parsing_func=parse_ae_ren_data)
b2.clean_dataframe()
b2.df

  ).clean_dataframe()


Unnamed: 0,installation,activity_id,institution_id,source_id,experiment_id,table_id,variable_id,grid_label,path
1,pv_distributed,WRF,ERA,EC-Earth3,historical,1hr,cf,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
3,pv_distributed,WRF,ERA,EC-Earth3,historical,1hr,gen,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
5,pv_distributed,WRF,ERA,EC-Earth3,historical,day,cf,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
7,pv_distributed,WRF,ERA,EC-Earth3,historical,day,gen,d03,s3://wfclimres/era/pv_distributed/ec-earth3/hi...
9,pv_distributed,WRF,ERA,EC-Earth3,ssp370,1hr,cf,d03,s3://wfclimres/era/pv_distributed/ec-earth3/ss...
...,...,...,...,...,...,...,...,...,...
215,windpower_onshore,WRF,ERA,MPI-ESM1-2-HR,ssp370,1hr,gen,d03,s3://wfclimres/era/windpower_onshore/mpi-esm1-...
217,windpower_onshore,WRF,ERA,TaiESM1,historical,1hr,cf,d03,s3://wfclimres/era/windpower_onshore/taiesm1/h...
219,windpower_onshore,WRF,ERA,TaiESM1,historical,1hr,gen,d03,s3://wfclimres/era/windpower_onshore/taiesm1/h...
221,windpower_onshore,WRF,ERA,TaiESM1,ssp370,1hr,cf,d03,s3://wfclimres/era/windpower_onshore/taiesm1/s...


### Method 2b: Feed the Builder your entire bucket (slow and untested)
In theory, you can also just feed the builder the entire bucket and have it crawl through everything. However, this method is really slow-- I haven't actually been patient enough to wait for the code to complete running, so I have it commented out below. I'm leaving it here for documentation's sake, in case it is useful for other s3 buckets in the future. 

#### Notes on the "depth" Builder input
In this case, the path to a file in our directory looks like this: `"s3://wfclimres/era/pv_distributed/ec-earth3/historical/1hr/cf/d03/"`<br>
But, the path we are giving Builder looks like this: `"s3://wfclimres/era/"`

Thus, the `depth` for this Builder would  be `6`: We need to crawl through 6 different directories (`"pv_distributed/ec-earth3/historical/1hr/cf/d03/"`) beyond the root directory to finally reach our zarr store. 

In [8]:
# exclude_patterns = [
#     "s3://wfclimres/era/derived_products/**",
#     "s3://wfclimres/era/resource_data/**",
#     "s3://wfclimres/era/rsrc_drought/**",
#     "s3://wfclimres/era/tmp/**",
#     "s3://wfclimres/era/data-guide_pv-wind.pdf"
#     "**/EC-Earth3/**", 
#     "**/ERA5/**", 
#     "**/MIROC6/**", 
#     "**/MPI-ESM1-2-HR/**", 
#     "**TaiESM1/**"
#     ]
# b3 = Builder(
#     paths=["s3://wfclimres/era/"], 
#     depth=6, 
#     exclude_patterns=exclude_patterns, 
#     include_patterns=["**/.zmetadata"]
# )
# b3.build(parsing_func=parse_ae_ren_data)
# b3.clean_dataframe()
# b3.df

### Confirm that these methods are equal 
If everything went as expected, methods 1 and 2 should return the same result. 

In [9]:
# Need to reset the index on the second Builder since it's not ordered appropriately for some reason 
b1.df.equals(b2.df.reset_index(drop=True))

True

## Step 3: Export the catalog files
We need to export `csv` and `json` files associated with our built catalog. We also need to set the data aggregations which are fed into xarray when reading in data using intake

In [10]:
b1.save(
    name='era-ren-collection',
    directory='../catalogs/era-ren-collection',
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variable_id',
    # Data file format - could be netcdf or zarr (in this case, zarr)
    data_format="zarr",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["installation","activity_id","institution_id","source_id","experiment_id","table_id","grid_label"], 
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {'type': 'union', 'attribute_name': 'variable_id'},
    ],
    description="Eagle Rock Analytics Renewables Data Catalog"
)

Successfully wrote ESM catalog json file to: file:///Users/nicolekeeney/code/intake-esm-tools/intake-esm-tools/../catalogs/era-ren-collection/era-ren-collection.json


## Step 4: Read in some files and verify that it all worked! 
Read in the catalog and try to download some data 

In [11]:
cat = intake.open_esm_datastore("../catalogs/era-ren-collection/era-ren-collection.json")
cat

Unnamed: 0,unique
installation,4
activity_id,1
institution_id,1
source_id,5
experiment_id,3
table_id,2
variable_id,2
grid_label,2
path,112
derived_variable_id,0


In [12]:
# Access catalog as dataframe and inspect the first few rows
cat_df = cat.df

In [13]:
# Form query dictionary
query = {
    # GCM name
    'source_id': 'EC-Earth3',
    # time period - historical or emissions scenario
    'experiment_id': ['historical', 'ssp370'],
    # variable
    'variable_id': 'cf',
    # time resolution 
    'table_id': 'day',
    # grid resolution: d01 = 45km, d02 = 9km, d03 = 3km
    'grid_label': 'd03'
}

# Subset catalog 
cat_subset = cat.search(**query)
cat_subset

Unnamed: 0,unique
installation,2
activity_id,1
institution_id,1
source_id,1
experiment_id,2
table_id,1
variable_id,1
grid_label,1
path,4
derived_variable_id,0


In [14]:
# Get dataset dictionary 
dsets = cat_subset.to_dataset_dict(
    xarray_open_kwargs={'consolidated': True},
    storage_options={'anon': True}
)


--> The keys in the returned dictionary of datasets are constructed as follows:
	'installation.activity_id.institution_id.source_id.experiment_id.table_id.grid_label'
 |████████████████████████████████████████| 100.00% [4/4 00:01<00:00]

In [15]:
# Display one of the files :) 
dsets["pv_distributed.WRF.ERA.EC-Earth3.ssp370.day.d03"]

Unnamed: 0,Array,Chunk
Bytes,467.02 kiB,14.27 kiB
Shape,"(492, 243)","(87, 42)"
Dask graph,36 chunks in 2 graph layers,36 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 467.02 kiB 14.27 kiB Shape (492, 243) (87, 42) Dask graph 36 chunks in 2 graph layers Data type float32 numpy.ndarray",243  492,

Unnamed: 0,Array,Chunk
Bytes,467.02 kiB,14.27 kiB
Shape,"(492, 243)","(87, 42)"
Dask graph,36 chunks in 2 graph layers,36 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,467.02 kiB,14.27 kiB
Shape,"(492, 243)","(87, 42)"
Dask graph,36 chunks in 2 graph layers,36 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 467.02 kiB 14.27 kiB Shape (492, 243) (87, 42) Dask graph 36 chunks in 2 graph layers Data type float32 numpy.ndarray",243  492,

Unnamed: 0,Array,Chunk
Bytes,467.02 kiB,14.27 kiB
Shape,"(492, 243)","(87, 42)"
Dask graph,36 chunks in 2 graph layers,36 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,467.02 kiB,14.27 kiB
Shape,"(492, 243)","(87, 42)"
Dask graph,36 chunks in 2 graph layers,36 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 467.02 kiB 14.27 kiB Shape (492, 243) (87, 42) Dask graph 36 chunks in 2 graph layers Data type float32 numpy.ndarray",243  492,

Unnamed: 0,Array,Chunk
Bytes,467.02 kiB,14.27 kiB
Shape,"(492, 243)","(87, 42)"
Dask graph,36 chunks in 2 graph layers,36 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.67 GiB,127.44 MiB
Shape,"(492, 243, 30682)","(87, 42, 9143)"
Dask graph,144 chunks in 2 graph layers,144 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 13.67 GiB 127.44 MiB Shape (492, 243, 30682) (87, 42, 9143) Dask graph 144 chunks in 2 graph layers Data type float32 numpy.ndarray",30682  243  492,

Unnamed: 0,Array,Chunk
Bytes,13.67 GiB,127.44 MiB
Shape,"(492, 243, 30682)","(87, 42, 9143)"
Dask graph,144 chunks in 2 graph layers,144 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
