# Fast NODD GRIB Aggregations

## Overview

In this tutorial we are going to demonstrate building kerchunk aggregations of **NODD grib2 weather forecasts** fast. This workflow primarily involves [xarray-datatree](https://xarray-datatree.readthedocs.io/en/latest/), [pandas](https://pandas.pydata.org/) and `grib_tree` function released in **kerchunkv0.2.3** for the operation.


### About the Dataset

For this operation we will be looking at GRIB2 files generated by [**NOAA Global Ensemble Forecast System (GEFS)**](https://www.ncei.noaa.gov/products/weather-climate-models/global-ensemble-forecast), is a weather forecast model made up of 21 separate forecasts, or ensemble members. With global coverage, GEFS is produced four times a day with weather forecasts going out to 16 days, with an update frequency of 4 times a day, every 6 hours starting at midnight.

More information on this dataset can be found [here](https://registry.opendata.aws/noaa-gefs)


## Prerequisites
| Concepts | Importance | Notes |
| --- | --- | --- |
| [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core |
| [Pandas Tutorial](https://foundations.projectpythia.org/core/pandas/pandas.html#) | Required | Core |
| [Kerchunk and Xarray-Datatree](https://projectpythia.org/kerchunk-cookbook/notebooks/using_references/Datatree.html) | Required | IO |
| [Xarray-Datatree Overview](https://xarray-datatree.readthedocs.io/en/latest/quick-overview.html)| Required | IO |

- **Time to learn**: 30 minutes

## Motivation

As we know that **kerchunk**  provides a unified way to represent a variety of chunked, compressed data formats (e.g. NetCDF/HDF5, GRIB2, TIFF, …) by generating *references*. This task flow has ability to build large aggregations from **NODD grib forecasts**
in a fraction of the time using the `idx files`.

## Imports

In [1]:
from kerchunk.grib2 import (
    scan_grib,
    grib_tree, 
    parse_grib_idx, 
    extract_datatree_chunk_index, 
    strip_datavar_chunks,
    build_idx_grib_mapping, 
    map_from_index, 
    reinflate_grib_store,
    AggregationType
)
import copy
import pandas as pd
import datatree
import fsspec
import datetime

## Building the Aggregation directly from the GRIB files

For building the aggregation, we're going to build a hierarchical data model, to view the whole dataset ,from a set of scanned grib messages with the help of `grib_tree` function. This data model can be opened directly using either zarr or xarray datatree. **This way of building the aggregation is very slow**. Here we're going to use `xarray-datatree` to open and view it. 

In [2]:
s3_files = [
    "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006", 
    "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af012", 
    "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af018",
    "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af024",
    "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af030",
    "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af036",
    "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af042"
]

In [3]:
# converting the references into the hierarchical datamodel
grib_tree_store = grib_tree([group for f in s3_files for group in scan_grib(f, storage_options=dict(anon=True))], remote_options=dict(anon=True))



In [4]:
# Transforming the output to datatree to view it. This tree model the variables
s3_dt = datatree.open_datatree(fsspec.filesystem("reference", fo=grib_tree_store, remote_protocol="s3", remote_options={"anon": True}).get_mapper(""), engine="zarr", consolidated=False)

In [19]:
# In this tree model, the variables are organized into hierarchical groups, first by "stepType" and then by "typeOfLevel."
s3_dt

> **Note**: If trying out this notebook, the above way of building the aggregation will take few minutes for this `GEFS` S3 repository. But in general, it take more time, based on the size of the grib files

## Building the aggregation faster with `idx` files

### Index Dataframe made from a single Grib file

Every **NODD** cloud platform stores the grib file along with its `.idx`(index) file, in *text* format. The purpose of using the `idx` files in the aggregation is that the k(erchunk) index data looks a lot like the idx files that are present for every grib file in NODD's **GCS** and **AWS** archive though. 

**This way of building of aggregation only works for a particular `horizon` file irrespective of the run time of the model.**

Here is what the contents of an `idx` file looks like.

```
1:0:d=2017010100:HGT:10 mb:12 hour fcst:ENS=low-res ctl
2:48163:d=2017010100:TMP:10 mb:12 hour fcst:ENS=low-res ctl
3:68112:d=2017010100:RH:10 mb:12 hour fcst:ENS=low-res ctl
4:79092:d=2017010100:UGRD:10 mb:12 hour fcst:ENS=low-res ctl
5:102125:d=2017010100:VGRD:10 mb:12 hour fcst:ENS=low-res ctl
6:122799:d=2017010100:HGT:50 mb:12 hour fcst:ENS=low-res ctl
7:178898:d=2017010100:TMP:50 mb:12 hour fcst:ENS=low-res ctl
8:201799:d=2017010100:RH:50 mb:12 hour fcst:ENS=low-res ctl
9:224321:d=2017010100:UGRD:50 mb:12 hour fcst:ENS=low-res ctl
10:272234:d=2017010100:VGRD:50 mb:12 hour fcst:ENS=low-res ctl
11:318288:d=2017010100:HGT:100 mb:12 hour fcst:ENS=low-res ctl
12:379010:d=2017010100:TMP:100 mb:12 hour fcst:ENS=low-res ctl
13:405537:d=2017010100:RH:100 mb:12 hour fcst:ENS=low-res ctl
14:441517:d=2017010100:UGRD:100 mb:12 hour fcst:ENS=low-res ctl
15:497421:d=2017010100:VGRD:100 mb:12 hour fcst:ENS=low-res ctl
```

The general format of `idx` data across the **NODD** cloud platforms is: `index:offset:date_with_runtime:variable:forecast_time:`.<br>
The metadata are separated by ":" (colon) and we need to convert it into a `Dataframe` for the mapping.

In [6]:
# converting the idx data into a dataframe
idxdf = parse_grib_idx("s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006", storage_options=dict(anon=True))
idxdf.head()

Unnamed: 0_level_0,offset,date,attrs,length,idx_uri,grib_uri
idx,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,0,d=2017010106,HGT:10 mb:6 hour fcst:ENS=low-res ctl,47493,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...
2,47493,d=2017010106,TMP:10 mb:6 hour fcst:ENS=low-res ctl,19438,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...
3,66931,d=2017010106,RH:10 mb:6 hour fcst:ENS=low-res ctl,10835,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...
4,77766,d=2017010106,UGRD:10 mb:6 hour fcst:ENS=low-res ctl,22625,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...
5,100391,d=2017010106,VGRD:10 mb:6 hour fcst:ENS=low-res ctl,20488,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...


### Building a mapping between the index dataframe and grib metadata

Now we're going to need a mapping from our grib/zarr metadata(stored in the `grib_tree` output) to the attributes in the idx files. They are unique for each time horizon e.g. you need to build a unique mapping for the 1 hour forecast, the 2 hour forecast and so on. So in this step we're going to create a **mapping** for a single grib file and its corresponding `idx` files in order, which will be used in later steps for building the aggregation. 

Before that let's see what **grib data** we're extracting from the datatree. The metadata that we'll be extracting will be static in nature. We're going to use a single node by [accessing](https://projectpythia.org/kerchunk-cookbook/notebooks/using_references/Datatree.html#accessing-the-datatree) it.

In [7]:
# Parsing the grib metadata from a single datatree node and converting it into a dataframe
grib_df = extract_datatree_chunk_index(s3_dt["ulwrf/avg/nominalTop"], grib_tree_store)
grib_df.head()

Unnamed: 0,varname,typeOfLevel,stepType,name,nominalTop,number,step,time,valid_time,uri,offset,length,inline_value
0,ulwrf,nominalTop,avg,Upward long-wave radiation flux,0.0,0,0 days 06:00:00,2017-01-01 06:00:00,2017-01-01 12:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,3924345,43221,
1,ulwrf,nominalTop,avg,Upward long-wave radiation flux,0.0,0,0 days 12:00:00,2017-01-01 06:00:00,2017-01-01 18:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,3951119,42550,
2,ulwrf,nominalTop,avg,Upward long-wave radiation flux,0.0,0,0 days 18:00:00,2017-01-01 06:00:00,2017-01-02 00:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,3948238,43273,
3,ulwrf,nominalTop,avg,Upward long-wave radiation flux,0.0,0,1 days 00:00:00,2017-01-01 06:00:00,2017-01-02 06:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,3876964,42613,
4,ulwrf,nominalTop,avg,Upward long-wave radiation flux,0.0,0,1 days 06:00:00,2017-01-01 06:00:00,2017-01-02 12:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,3865989,42782,


> **Note**: Above process is part of the mapping creation, the function call to `extract_datatree_chunk_index` in handled inside `build_idx_grib_mapping` function

In [8]:
# creating a mapping for a single horizon file which is to be used later
mapping = build_idx_grib_mapping("s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z.pgrb2af006", storage_options=dict(anon=True), remote_options=dict(anon=True), validate=True)
mapping.head()

The grib hierarchy in s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z.pgrb2af006 is not unique for 54 variables: ['gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'u', 'v', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 't', 'r', 'u', 'v', 'gh']


Unnamed: 0_level_0,offset_idx,date,attrs,length_idx,idx_uri,grib_uri,varname,typeOfLevel,stepType,name,level,step,time,valid_time,uri,offset_grib,length_grib,inline_value
idx,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
1,0,d=2017010118,HGT:10 mb:6 hour fcst:ENS=low-res ctl,47305,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,gh,isobaricInhPa,instant,Geopotential height,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,0,47305,
2,47305,d=2017010118,TMP:10 mb:6 hour fcst:ENS=low-res ctl,19935,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,t,isobaricInhPa,instant,Temperature,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,47305,19935,
3,67240,d=2017010118,RH:10 mb:6 hour fcst:ENS=low-res ctl,10966,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,r,isobaricInhPa,instant,Relative humidity,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,67240,10966,
4,78206,d=2017010118,UGRD:10 mb:6 hour fcst:ENS=low-res ctl,23206,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,u,isobaricInhPa,instant,U component of wind,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,78206,23206,
5,101412,d=2017010118,VGRD:10 mb:6 hour fcst:ENS=low-res ctl,20675,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,v,isobaricInhPa,instant,V component of wind,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,101412,20675,


### Building the index 

Now if we parse the runtime from the idx file , we can build a fully compatible k_index(kerchunk index) for that **particular file**. Before creating the index, we need to clean some of the data in the mapping and index dataframe for the some variables as they tend to contain duplicate values, as demonstrated below.

In [9]:
# cleaning the mapping 
mapping.loc[~mapping["attrs"].duplicated(keep="first"), :].head()

Unnamed: 0_level_0,offset_idx,date,attrs,length_idx,idx_uri,grib_uri,varname,typeOfLevel,stepType,name,level,step,time,valid_time,uri,offset_grib,length_grib,inline_value
idx,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
1,0,d=2017010118,HGT:10 mb:6 hour fcst:ENS=low-res ctl,47305,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,gh,isobaricInhPa,instant,Geopotential height,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,0,47305,
2,47305,d=2017010118,TMP:10 mb:6 hour fcst:ENS=low-res ctl,19935,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,t,isobaricInhPa,instant,Temperature,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,47305,19935,
3,67240,d=2017010118,RH:10 mb:6 hour fcst:ENS=low-res ctl,10966,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,r,isobaricInhPa,instant,Relative humidity,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,67240,10966,
4,78206,d=2017010118,UGRD:10 mb:6 hour fcst:ENS=low-res ctl,23206,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,u,isobaricInhPa,instant,U component of wind,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,78206,23206,
5,101412,d=2017010118,VGRD:10 mb:6 hour fcst:ENS=low-res ctl,20675,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,v,isobaricInhPa,instant,V component of wind,0.0,0 days 06:00:00,2017-01-01 18:00:00,2017-01-02,s3://noaa-gefs-pds/gefs.20170101/18/gec00.t18z...,101412,20675,


In [10]:
# this step will be performed for every grib-idx pair where we will be using the "mapping" dataframe which we created previously 
mapped_index = map_from_index(
    pd.Timestamp("2017-01-01T06"),
    mapping.loc[~mapping["attrs"].duplicated(keep="first"), :],
    idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :],
)
mapped_index.head()

Unnamed: 0,varname,typeOfLevel,stepType,name,step,level,time,valid_time,uri,offset,length,inline_value
0,gh,isobaricInhPa,instant,Geopotential height,0 days 06:00:00,0.0,2017-01-01 06:00:00,2017-01-01 12:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,0,47493,
1,t,isobaricInhPa,instant,Temperature,0 days 06:00:00,0.0,2017-01-01 06:00:00,2017-01-01 12:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,47493,19438,
2,r,isobaricInhPa,instant,Relative humidity,0 days 06:00:00,0.0,2017-01-01 06:00:00,2017-01-01 12:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,66931,10835,
3,u,isobaricInhPa,instant,U component of wind,0 days 06:00:00,0.0,2017-01-01 06:00:00,2017-01-01 12:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,77766,22625,
4,v,isobaricInhPa,instant,V component of wind,0 days 06:00:00,0.0,2017-01-01 06:00:00,2017-01-01 12:00:00,s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z...,100391,20488,


### Final step of building of Aggregation

For the final step of the aggregation, we will create an index for each GRIB file to cover a two-month period starting from the specified date and convert it into one combined index and you store this index for later use. We will be using the 6-hour horizon file for building the aggregation, this will be from `2017-01-01` to `2017-02-28`. This is because as we already know this way of aggregation only works for a particular `horizon` file. 

In [11]:
mapped_index_list = []

deduped_mapping = mapping.loc[~mapping["attrs"].duplicated(keep="first"), :]

# this process is manually done because the runtime and forecast time will vary between models i.e. GEFS, GFS, HRRR etc.
for date in pd.date_range("2017-01-01", "2017-02-28"):
  for runtime in range(0, 24, 6):   
    fname = f"s3://noaa-gefs-pds/gefs.{date.strftime('%Y%m%d')}/{runtime:02}/gec00.t{runtime:02}z.pgrb2af006"
    
  idxdf = parse_grib_idx(basename=fname, storage_options=dict(anon=True))

  mapped_index = map_from_index(
      pd.Timestamp(date + datetime.timedelta(hours=runtime)),
      deduped_mapping,
      idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :],
  )

  mapped_index_list.append(mapped_index)

s3_kind = pd.concat(mapped_index_list)

> Tip: To confirm the above step, check out this [notebook](https://nbviewer.org/github/Anu-Ra-g/GSoC2024_Kerchunk/blob/main/testing_index.ipynb)

## Using the aggregation

# Improve below this

In [12]:
axes = [
  pd.Index(
    [
      pd.timedelta_range(start="0 hours", end="24 hours", freq="6h", closed="right", name="6 hour"),
    ],
    name="step"
  ),
  pd.date_range("2017-01-01T00:00", "2017-03-01T00:00", freq="360min", name="valid_time")
]
axes

[Index([[0 days 06:00:00, 0 days 12:00:00, 0 days 18:00:00, 1 days 00:00:00]], dtype='object', name='step'),
 DatetimeIndex(['2017-01-01 00:00:00', '2017-01-01 06:00:00',
                '2017-01-01 12:00:00', '2017-01-01 18:00:00',
                '2017-01-02 00:00:00', '2017-01-02 06:00:00',
                '2017-01-02 12:00:00', '2017-01-02 18:00:00',
                '2017-01-03 00:00:00', '2017-01-03 06:00:00',
                ...
                '2017-02-26 18:00:00', '2017-02-27 00:00:00',
                '2017-02-27 06:00:00', '2017-02-27 12:00:00',
                '2017-02-27 18:00:00', '2017-02-28 00:00:00',
                '2017-02-28 06:00:00', '2017-02-28 12:00:00',
                '2017-02-28 18:00:00', '2017-03-01 00:00:00'],
               dtype='datetime64[ns]', name='valid_time', length=237, freq='360min')]

In [14]:
# Reinflating works with a instance of grib tree 

In [15]:
s3_store = reinflate_grib_store(
    axes=axes,
    aggregation_type=AggregationType.HORIZON,
    chunk_index=s3_kind.loc[s3_kind.varname.isin(["ulwrf", "prmsl"])],
    zarr_ref_store=deflated_s3_grib_tree_store  
)

  if lookup not in unique_groups:


In [16]:
s3_dt_subset = datatree.open_datatree(fsspec.filesystem("reference", fo=s3_store, remote_protocol="s3", remote_options={"anon": True}).get_mapper(""), engine="zarr", consolidated=False)

In [17]:
s3_dt_subset

In [18]:
s3_dt_subset.ulwrf.avg.nominalTop.ulwrf[0,0,0:10,8].plot()

OutOfBoundsTimedelta: Cannot convert 4618441417868443648 hours to timedelta64[ns] without overflow