# GRIB to NetCDF4 conversion in the CADS

This notebook demonstrates how GRIB files are converted to NetCDF in the CADS such that users have full traceability of the processing chain, and can modify the process should they have different requirements for their netCDF format.

In [1]:
# Import libraries used
from typing import Any
import os
import xarray as xr
import cfgrib
import cdsapi

## Get example grib data from the CDS

To make it interesting we are going to request atmospheric, soil level and ocean wave data. This will give us multiple challenges in terms of handling different level types and spatial grids.

In [2]:
dataset = "reanalysis-era5-single-levels"
request = {
    'product_type': ['reanalysis'],
    'variable': ['10m_u_component_of_wind', '10m_v_component_of_wind', '2m_dewpoint_temperature', '2m_temperature', 'mean_sea_level_pressure', 'mean_wave_direction', 'mean_wave_period', 'sea_surface_temperature', 'significant_height_of_combined_wind_waves_and_swell', 'surface_pressure', 'total_precipitation'],
    'year': ['1985'],
    'month': ['02'],
    'day': ['03'],
    'time': ['00:00', '01:00', '02:00', '03:00', '04:00', '05:00', '06:00', '07:00', '08:00', '09:00', '10:00', '11:00', '12:00', '13:00', '14:00', '15:00', '16:00', '17:00', '18:00', '19:00', '20:00', '21:00', '22:00', '23:00'],
    'data_format': 'grib',
    'download_format': 'unarchived'
}

grib_file = "data.grib"
if not os.path.exists(grib_file): # Don't download the file if it already exists
    client = cdsapi.Client()
    grib_file = client.retrieve(dataset, request).download("data.grib")

# Get the base name of the file to use in the output file names
fname, _ = os.path.splitext(os.path.basename(grib_file))

## Configuration options

Now we set some options that we will use to open the grib file in xarray and perform some minor post-processing to produce our NetCDF file.

In [3]:
# Configuration options for opening the GRIB files as an xarray object,
#  these depend on the dataset your are working with, this example is for ERA5 single-levels

# The open_datasets_kwargs is a list of dictionaries, which is used to open the grib file into a list of
#  consistent hypercube which are compatible netCDF. There are a number of common elements, so we define a
#  common set and use them in each open_dataset_kwargs dictionary.
common_kwargs = {
    'time_dims': ['valid_time'],
    'ignore_keys': ['edition'],
    'extra_coords': {'expver': 'valid_time'},
    'coords_as_attributes': [
        'surface',
        'depthBelowLandLayer',
        'entireAtmosphere',
        'heightAboveGround',
        'meanSea'
    ]
}
open_datasets_kwargs: dict[str, Any] | list[dict[str, Any]] = [
    # To open the atmospheric variables
    {**common_kwargs, 'filter_by_keys': {'stream': ['oper']}, 'tag': 'stream-oper'},
    # To open the ocean-wave variables
    {**common_kwargs,'filter_by_keys': {'stream': ['wave']}, 'tag': 'stream-wave'}
]
# Keywords that are used to modify the xarray object after opening
post_open_datasets_kwargs: dict[str, Any] = {
    'rename': {
        'time': 'forecast_reference_time',
        'step': 'forecast_period',
        'isobaricInhPa': 'pressure_level',
        'hybrid': 'model_level'
    },
    'expand_dims': ['valid_time', 'pressure_level', 'model_level']
}

# Configuration options for writing the xarray object to a netcdf file,
#  These are the options used for all datasets in the CADS
# Keywords that are used when writing to netcdf
to_netcdf_kwargs: dict[str, Any] = {
    "engine": "h5netcdf",
}
# Compression options to use when writing to netcdf, note that they are dependent on the engine
compression_options = {
    "zlib": True,
    "complevel": 1,
    "shuffle": True,
}

print(
    f"Converting {grib_file} to netCDF files with:\n"
    f"  open_datasets_kwargs: {open_datasets_kwargs}\n"
    f"  post_open_datasets_kwargs: {post_open_datasets_kwargs}\n"
    f"  to_netcdf_kwargs: {to_netcdf_kwargs}\n"
    f"  compression_options: {compression_options}\n"
)

Converting data.grib to netCDF files with:
  open_datasets_kwargs: [{'time_dims': ['valid_time'], 'ignore_keys': ['edition'], 'extra_coords': {'expver': 'valid_time'}, 'coords_as_attributes': ['surface', 'depthBelowLandLayer', 'entireAtmosphere', 'heightAboveGround', 'meanSea'], 'filter_by_keys': {'stream': ['oper']}, 'tag': 'stream-oper'}, {'time_dims': ['valid_time'], 'ignore_keys': ['edition'], 'extra_coords': {'expver': 'valid_time'}, 'coords_as_attributes': ['surface', 'depthBelowLandLayer', 'entireAtmosphere', 'heightAboveGround', 'meanSea'], 'filter_by_keys': {'stream': ['wave']}, 'tag': 'stream-wave'}]
  post_open_datasets_kwargs: {'rename': {'time': 'forecast_reference_time', 'step': 'forecast_period', 'isobaricInhPa': 'pressure_level', 'hybrid': 'model_level'}, 'expand_dims': ['valid_time', 'pressure_level', 'model_level']}
  to_netcdf_kwargs: {'engine': 'h5netcdf'}
  compression_options: {'zlib': True, 'complevel': 1, 'shuffle': True}



## Open the GRIB as a dictionary of xarray datasets

This is handled differently depending on whether the kwargs are passed as a list of dictionaries, or a single dictionary.

If a list we iterate over each dictionary and return a dataset which matches it. These should include the filter_by_keys argument which will only select the fields which match the filter. This is used to split the grib file into complete hypercubes (which are compatible with NetCDF). We can then use the tag argument to label each hypercube dataset, and subsequent NetCDF file, with an appropriate name.

If a single dictionary, we first try to open directly with `xarray.open_dataset` as a single hypercube. If there is an inconsistency in the hypercube, then this will fail and we retry to open with `cfgrib.open_datasets` which will automatically split the grib into complete hypercubes in the form of a list of datasets. NOTE: One draw back here is that cfgrib does not provide the information as to how the grib file was split, so we cannot label the hypercubes appropriately, therefore we just label with an number. 

In [4]:
# if open_datasets_kwargs is a list, then we open the grib file as a list of datasets
if isinstance(open_datasets_kwargs, list):
    datasets: dict[str, xr.Dataset] = {}
    for i, open_ds_kwargs in enumerate(open_datasets_kwargs):
        # Default engine is cfgrib
        open_ds_kwargs.setdefault("engine", "cfgrib")
        # The tag in the open_datasets_kwargs is used to name the dataset,
        # and subsequently the NetCDF file, if no tag is provided the index number is used
        ds_tag = open_ds_kwargs.pop("tag", i)
        try:
            ds = xr.open_dataset(grib_file, **open_ds_kwargs)
        except Exception:
            ds = None
        if ds:
            datasets[f"{fname}_{ds_tag}"] = ds
else:
    # We make sure that cfgrib raises an error if it encounters any issues, 
    # e.g. with inconsistent field dimensions
    open_datasets_kwargs.setdefault("errors", "raise")

    # First try and open with xarray as a single dataset,
    try:
        datasets = {
            f"{fname}": xr.open_dataset(grib_file, **open_datasets_kwargs)
        }
    except Exception:
        print(
            f"Failed to open with xr.open_dataset({grib_file}, **{open_datasets_kwargs}), "
            "opening with cfgrib.open_datasets instead."
        )
        # cfgrib.open_datasets returns a list of datasets, and it is not yet possible to sensibly tag them,
        # so we use the index number as the tag
        datasets = {
            f"{fname}_{i}": ds
            for i, ds in enumerate(
                cfgrib.open_datasets(grib_file, **open_datasets_kwargs)
            )
        }

## Modify the xarray object to meet the defined standard

The raw output from cfgrib uses the native MARS-GRIB metadata keys for the dimensions and coordinates. We modify this such that the dimension and coordinate names are a little more user friendly, and easy to distinguish given overlaps with CF convention definitions.

To make these steps easier to work with we have defined stand-alone functions, then apply them as we iterate over the dictionary of datasets.

In [6]:
# Define a function to safely rename variables in an xarray dataset,
#  i.e. ensures that the new names are not already in the dataset
def safely_rename_variable(dataset: xr.Dataset, rename: dict[str, str]) -> xr.Dataset:
    """
    Rename variables in an xarray dataset,
    ensuring that the new names are not already in the dataset.
    """
    # Create a rename order based on variabels that exist in datasets, and if there is
    # a conflict, the variable that is being renamed will be renamed first.
    rename_order: list[str] = []
    conflicts: list[str] = []
    for old_name, new_name in rename.items():
        if old_name not in dataset:
            continue

        if new_name in dataset:
            rename_order.append(old_name)
            conflicts.append(old_name)
        else:
            rename_order = [old_name] + rename_order

    # Ensure that the conflicts are handled correctly
    # Is this necessary? We can let xarray fail by itself in the next step.
    for conflict in conflicts:
        new_name = rename[conflict]
        if (new_name not in rename_order) or (
            rename_order.index(conflict) > rename_order.index(new_name)
        ):
            raise ValueError(
                f"Refusing to to rename to existing variable name: {conflict}->{new_name}"
            )

    for old_name in rename_order:
        dataset = dataset.rename({old_name: rename[old_name]})

    return dataset

# Define a function to safely expand dimensions in an xarray dataset,
#  ensures that the data for the new dimensions are in the dataset
def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset:
    """
    Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset
    and that the order of dimensions is preserved.
    """
    dims_required = [c for c in dataset.coords if c in expand_dims + list(dataset.dims)]
    dims_missing = [
        (c, i) for i, c in enumerate(dims_required) if c not in dataset.dims
    ]
    dataset = dataset.expand_dims(
        dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing]
    )
    return dataset


out_datasets: dict[str, xr.Dataset] = {}
for out_fname_base, dataset in datasets.items():
    dataset = safely_rename_variable(dataset, post_open_datasets_kwargs.get("rename", {}))

    dataset = safely_expand_dims(dataset, post_open_datasets_kwargs.get("expand_dims", []))

    out_datasets[out_fname_base] = dataset

datasets = out_datasets

## Write the datasets to NetCDF

We loop over each dataset in the dictionary and write to netCDF, using the specified options. Note that the compression options must be added to each variable in the dataset.

In [7]:
# put the engine in the to_netcdf_kwargs, and remove it from the compression options
to_netcdf_kwargs.setdefault("engine", compression_options.pop("engine", "h5netcdf"))
out_nc_files = []
for out_fname_base, dataset in datasets.items():
    to_netcdf_kwargs.update(
        {
            # Add the compression options to the encoding of each variable in the dataset
            "encoding": {var: compression_options for var in dataset},
        }
    )
    out_fname = f"{out_fname_base}.nc"
    dataset.to_netcdf(out_fname, **to_netcdf_kwargs)
    out_nc_files.append(out_fname)
print("NetCDF file(s) created:\n", "\n".join(out_nc_files))

NetCDF file(s) created:
 data_stream-oper.nc
data_stream-wave.nc
