# core

> This is a core library for the ERA5 dataset pipeline. It defines
a few helpful functions such as an API tester to test your API key and connection.

In [1]:
#| default_exp core

In [2]:
#| hide
from nbdev.showdoc import *

In [27]:
#| export
import os
import cdsapi
import hydra
import json
import tempfile
import argparse
import zipfile
import shutil
import geopandas as gpd
from pathlib import Path
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from omegaconf import DictConfig, OmegaConf
from pyprojroot import here
from importlib import import_module


## Utilities

Some utilities are provided to help you with the ERA5 dataset.

In [4]:
#| export
def describe(
    cfg: DictConfig=None,  # Configuration file
    )-> None:
    "Describe the configuration file used by Hydra for the pipeline"
    
    if cfg is None:
        print("No configuration file provided. Generating default configuration file.")
        cfg = OmegaConf.create()
        
    print("This package fetches ERA5 data. The following is the config file used by Hydra for the pipeline:\n")
    print(OmegaConf.to_yaml(cfg))

In [5]:
#| exporti
def _expand_path(
        path: str   # Path on user's machine
        )->   str:  # Expanded path
    "Expand the path on the user's machine for cross compatibility"

    # Expand ~ to the user's home directory
    path = os.path.expanduser(path)
    # Expand environment variables
    path = os.path.expandvars(path)
    # Convert to absolute path
    path = os.path.abspath(path)
    return path

In [6]:
#| exporti
def _get_callable(func_path):
    """Dynamically import a callable from a string path."""
    module_name, func_name = func_path.rsplit(".", 1)
    module = import_module(module_name)
    return getattr(module, func_name)

In [7]:
#| exporti

def _create_directory_structure(
        base_path: str,  # The base directory where the structure will be created
        structure: dict  # A dictionary representing the directory structure
    )->None:
    """
    Recursively creates a directory structure from a dictionary.

    Args:
        base_path (str): The base directory where the structure will be created.
        structure (dict): A dictionary representing the directory structure.
    """
    for folder, substructure in structure.items():
        # Create the current directory
        current_path = os.path.join(base_path, folder)
        os.makedirs(current_path, exist_ok=True)
        
        # Recursively create subdirectories if substructure is a dictionary
        if isinstance(substructure, dict):
            _create_directory_structure(current_path, substructure)

In [None]:
#| export

def kelvin_to_celsius(kelvin):
    """
    Convert temperature from Kelvin to Celsius.
    
    Args:
        kelvin (float): Temperature in Kelvin.
        
    Returns:
        float: Temperature in Celsius.
    """
    return kelvin - 273.15

### A Class for Authenticating Google Drive

We're going to use a class to authenticate and interact with google drive. The goal is to have a simple interface to fetch the healthshed files dynamically from google drive in the pipeline.

In [8]:
#| export

class GoogleDriver:
    """
    A class to handle Google Drive authentication and file management.
    This class uses the PyDrive2 library to authenticate with Google Drive using a service account.
    
    It provides three methods: authenticating the account, getting the drive object, and downloading the healthshed files for madagascar.
    """
    def __init__(self, json_key_path=None):
        self.json_key_path = json_key_path or os.getenv("GOOGLE_DRIVE_AUTH_JSON")
        if not self.json_key_path or not os.path.isfile(self.json_key_path):
            raise FileNotFoundError(f"Service account key file not found: {self.json_key_path}")
        self.drive = self._authenticate()

    def _authenticate(self):

        settings = {
            "client_config_backend": "service",
            "service_config": {
                "client_json_file_path": self.json_key_path
            }
        }
        gauth = GoogleAuth(settings=settings)

        gauth.ServiceAuth()
        
        return GoogleDrive(gauth)

    def get_drive(self):
        return self.drive

Here's how we use it. The credentials for the data-pipeline service account are
available in the sandbox folder, and the path to said folder is set in the config:

In [9]:
from hydra import initialize, compose
from omegaconf import OmegaConf

# unfortunately, we have to use the initialize function to load the config file
# this is because the @hydra decorator does not work with Notebooks very well
# this is a known issue with Hydra: https://gist.github.com/bdsaglam/586704a98336a0cf0a65a6e7c247d248
# 
# just use the relative path from the notebook to the config dir
with initialize(version_base=None, config_path="../conf"):
    cfg = compose(config_name='config.yaml')

In [10]:
auth = GoogleDriver(json_key_path=here() / cfg.GOOGLE_DRIVE_AUTH_JSON.path)
drive = auth.get_drive()

Here's how we might check that the healthsheds are accessible in the drive:

In [11]:
# we're using the madagascar healthshed folder as an example
folder_id = cfg.geographies.madagascar.healthsheds
folder_name = "healthsheds2022.zip"
file_list = drive.ListFile({'q': f" title='{folder_name}' and trashed = false "}).GetList()

for file in file_list:
    print(f"{file['title']} - {file['mimeType']}")

healthsheds2022.zip - application/zip


That being said, we can read in  the healthsheds into geopandas by downloading them to a temp directory. The healthsheds must be a zipped shapefiles package with the files at the root of the zip directory.

In [12]:
with tempfile.TemporaryDirectory() as temp_dir:
    # Create a temporary directory to store the downloaded file
    zip_path = os.path.join(temp_dir, folder_name)

    # Download file from Google Drive
    file_obj = drive.CreateFile({'id': file_list[0]['id']})
    file_obj.GetContentFile(zip_path)

    # Read shapefile directly from ZIP
    gdf = gpd.read_file(f"zip://{zip_path}")




That works! So now we can patch the class to include this workflow:

In [13]:
#| export
from fastcore.basics import patch

In [14]:
#| export

@patch
def read_healthsheds(self:GoogleDriver, healthshed_zip_name):

    file_list = self.drive.ListFile({'q': f" title='{healthshed_zip_name}' and trashed = false "}).GetList()

    with tempfile.TemporaryDirectory() as temp_dir:
        # Create a temporary directory to store the downloaded file
        zip_path = os.path.join(temp_dir, healthshed_zip_name)

        # Download file from Google Drive
        file_obj = self.drive.CreateFile({'id': file_list[0]['id']})
        file_obj.GetContentFile(zip_path)

        # Read shapefile directly from ZIP
        gdf = gpd.read_file(f"zip://{zip_path}")

        # we need to ensure that the healthsheds only contain valid polygons
        gdf = gdf[gdf.geometry.notnull()]
        gdf.reset_index(drop=True, inplace=True)
        
        return gdf

And to check that it works:

In [15]:
driver = GoogleDriver(json_key_path=here() / cfg.GOOGLE_DRIVE_AUTH_JSON.path)
drive = driver.get_drive()
healthsheds = driver.read_healthsheds("healthsheds2022.zip")

healthsheds.describe()

Unnamed: 0,fs_pop,n_uid,n_instat,n_comp,n_shape
count,2766.0,2766.0,2766.0,2766.0,2766.0
mean,10493.05893,7.480116,6.318149,1.010484,1.036515
std,12127.817529,7.263235,4.939271,0.112019,0.39312
min,0.0,1.0,1.0,1.0,1.0
25%,4344.75,4.0,3.0,1.0,1.0
50%,7417.0,6.0,5.0,1.0,1.0
75%,12531.25,9.0,8.0,1.0,1.0
max,194782.0,104.0,62.0,3.0,15.0


## CDS File Handler Type

We're going to make a file handler type to help deal with CDS files. This is to fix [NSAPH-Data-Processing/era5_sandbox#13](https://github.com/NSAPH-Data-Processing/era5_sandbox/issues/13). 

Usually, when you download data, it comes out as a simple .nc file that can be opened with xarray. However, the CDS API has a few different file types that are not .nc files. For example, the ERA5 data is stored in a .grib file format. This is a common format for meteorological data, and it is used by the ECMWF. When a query has multiple variables, sometimes they are downloaded as a .zip file to separat the grib from the netcdf.

So, below, we define a class that can handle the file no matter what the type is. It will check the file type and then use the appropriate method to open it. The class will also have a method to check if the file is a .zip file, and if so, it will unzip it and return the path to the unzipped file.

In [16]:
#| export

class ClimateDataFileHandler:
    """
    A class to handle file operations for the Climate Data Store (CDS).
    This class provides unpack files downloaded from the CDS API. It must be able to
    handle the unpacking of files downloaded from the CDS API. This means that
    if the file is a basic netcdf, it should be passed to the netcdf handler. If
    the file is a zip, it should be handled by the zip handler in temp and the
    data returned as required.
    """

    def __init__(self, input_path: str):
        """
        Core initialization. It requires only the path. The flags are
        then used to do the handling logic.
        """
        self.original_path = Path(input_path)
        
        # major flag here for logic
        self.is_zip = False
        
        # the unzipping directory
        self.unzipped_dir = None
        
        # the instant data, such as temperature
        self.instant_file = None
        
        # the cumulative data, such as precipitation
        self.accum_file = None
        
        # any extraneous data
        self.other_files = []

        # ready to be used
        self._prepared = False

    def prepare(self):
        """
        Inspect the file and prepare the appropriate NetCDF paths.
        """
        if self._prepared:
            return

        if not self.original_path.exists():
            raise FileNotFoundError(f"{self.original_path} does not exist")

        # Detect ZIP by magic number
        # chatgpt implementation here; this is a common way to check for zip files
        with open(self.original_path, "rb") as f:
            sig = f.read(4)
            self.is_zip = sig == b'PK\x03\x04'

        if self.is_zip:
            self._unzip_and_scan()
        else:
            self.instant_file = str(self.original_path)

        self._prepared = True

    def _unzip_and_scan(self):
        """Extract and identify stepType-specific NetCDFs from ZIP."""
        self.unzipped_dir = tempfile.TemporaryDirectory()
        with zipfile.ZipFile(self.original_path, 'r') as zip_ref:
            zip_ref.extractall(self.unzipped_dir.name)

        for f in Path(self.unzipped_dir.name).glob("*.nc"):
            if "stepType-instant" in f.name:
                self.instant_file = str(f)
            elif "stepType-accum" in f.name:
                self.accum_file = str(f)
            else:
                self.other_files.append(str(f))

    def get_dataset(self, type: str = "instant") -> str:
        """Get the appropriate dataset path ('instant' or 'accum')."""
        self.prepare()

        if type == "instant" and self.instant_file:
            return self.instant_file
        elif type == "accum" and self.accum_file:
            return self.accum_file
        elif type == "any":
            return self.instant_file or self.accum_file or (self.other_files[0] if self.other_files else None)
        else:
            raise ValueError(f"No file found for requested type '{type}'")

    def cleanup(self):
        """Clean up any temporary directories created during unzip."""
        if self.unzipped_dir is not None:
            self.unzipped_dir.cleanup()

In [None]:
import xarray as xr
from fastcore.test import test_fail

eg_file = here() / "data/input/madagascar_2023_10.nc"

# this fails because the nc file downloaded has grib and netcdf in it, so
# xr cannot handle it.
def wont_work(multilayer_file):

    ds = xr.open_dataset(multilayer_file)

test_fail(
    wont_work,
    args=(eg_file)
)

# equivalent to saying try: wont_work(eg_file) Except: some error handling

The above fails because the download contains temperature and precipitation data, which get encoded silently as different formats. Even though it is one file, it contains both grib and netcdf data and is encoded as a .zip file. So we use the class to read it instead:

In [30]:
handler = ClimateDataFileHandler(eg_file)
handler.prepare()
ds1 = xr.open_dataset(handler.get_dataset("instant"))
ds2 = xr.open_dataset(handler.get_dataset("accum"))

In [29]:
ds1

In [31]:
ds2

In [32]:
handler.cleanup()

Great! Let's add a context handler and this can be added to the pipeline.

In [35]:
#| export

@patch
def __enter__(self:ClimateDataFileHandler):
    self.prepare()
    return self

@patch
def __exit__(self:ClimateDataFileHandler, exc_type, exc_val, exc_tb):
    self.cleanup()

In [36]:
with ClimateDataFileHandler(eg_file) as handler:
    ds1 = xr.open_dataset(handler.get_dataset("instant"))
    ds2 = xr.open_dataset(handler.get_dataset("accum"))

    print(ds1)
    print(ds2)

<xarray.Dataset> Size: 16kB
Dimensions:     (valid_time: 1, latitude: 59, longitude: 33)
Coordinates:
    number      int64 8B ...
  * valid_time  (valid_time) datetime64[ns] 8B 2023-10-01
  * latitude    (latitude) float64 472B -11.6 -11.85 -12.1 ... -25.85 -26.1
  * longitude   (longitude) float64 264B 42.7 42.95 43.2 ... 50.2 50.45 50.7
    expver      <U4 16B ...
Data variables:
    d2m         (valid_time, latitude, longitude) float32 8kB ...
    t2m         (valid_time, latitude, longitude) float32 8kB ...
Attributes:
    GRIB_centre:             ecmf
    GRIB_centreDescription:  European Centre for Medium-Range Weather Forecasts
    GRIB_subCentre:          0
    Conventions:             CF-1.7
    institution:             European Centre for Medium-Range Weather Forecasts
    history:                 2025-04-30T16:38 GRIB to CDM+CF via cfgrib-0.9.1...
<xarray.Dataset> Size: 9kB
Dimensions:     (valid_time: 1, latitude: 59, longitude: 33)
Coordinates:
    number      int64 8B ..

## Tests and Main

In `nbdev`, our tests are embedded in the notebook. Whenever you export the notebook, all the cells that are specified to run are run, and hence, the tests are executed. The tests are also exported. This is a great way to ensure that your documentation is always up-to-date. For this module, we're using the `testAPI()` function as our main test.

In [None]:
#| export
def testAPI(
    cfg: DictConfig=None,
    dataset:str="reanalysis-era5-pressure-levels"
    )-> bool:    
    
    # parse config
    testing=cfg.development_mode
    output_path=here("data") / "testing"

    print(OmegaConf.to_yaml(cfg))

    try:
        client = cdsapi.Client()

        # build request
        request = {
            'product_type': ['reanalysis'],
            'variable': ['geopotential'],
            'year': ['2024'],
            'month': ['03'],
            'day': ['01'],
            'time': ['13:00'],
            'pressure_level': ['1000'],
            'data_format': 'grib',
        }

        target = output_path / 'test_download.grib'
        
        print("Testing API connection by downloading a dummy dataset to {}...".format(output_path))

        client.retrieve(dataset, request, target)

        if not testing:
            os.remove(target)
        
        print("API connection test successful.")
        return True

    except Exception as e:
        print("API connection test failed.")
        print("Did you set up your API key with CDS? If not, please visit https://cds.climate.copernicus.eu/how-to-api#install-the-cds-api-client")
        print("Error: {}".format(e))
        return False

We can see that this API tester tool works with Hydra configuration:

In [None]:
from hydra import initialize, compose
from omegaconf import OmegaConf

# unfortunately, we have to use the initialize function to load the config file
# this is because the @hydra decorator does not work with Notebooks very well
# this is a known issue with Hydra: https://gist.github.com/bdsaglam/586704a98336a0cf0a65a6e7c247d248
# 
# just use the relative path from the notebook to the config dir
with initialize(version_base=None, config_path="../conf"):
    cfg = compose(config_name='config.yaml')

describe(cfg)

This package fetches ERA5 data. The following is the config file used by Hydra for the pipeline:

query:
  product_type: reanalysis
  variable:
  - 2m_dewpoint_temperature
  - 2m_temperature
  - skin_temperature
  - total_precipitation
  year:
  - 2010
  - 2011
  month:
  - 1
  - 2
  - 3
  - 4
  - 5
  - 6
  - 7
  - 8
  - 9
  - 10
  - 11
  - 12
  day:
  - 1
  - 2
  - 3
  - 4
  - 5
  - 6
  - 7
  - 8
  - 9
  - 10
  - 11
  - 12
  - 13
  - 14
  - 15
  - 16
  - 17
  - 18
  - 19
  - 20
  - 21
  - 22
  - 23
  - 24
  - 25
  - 26
  - 27
  - 28
  - 29
  - 30
  - 31
  time:
  - 0
  - 1
  - 2
  - 3
  - 4
  - 5
  - 6
  - 7
  - 8
  - 9
  - 10
  - 11
  - 12
  - 13
  - 14
  - 15
  - 16
  - 17
  - 18
  - 19
  - 20
  - 21
  - 22
  - 23
  area:
  - 0
  - 360
  - -90
  - 90
  data_format: netcdf
  download_format: unarchived
datapaths:
  input: null
  output: null



### Importing the Main Function

Important: using `__main__` in nbdev and Hydra is a little bit tricky. We need to define the main function in the module ONLY ONCE and then when we export the notebook to script, we need to add the `nbdev.imports.IN_NOTEBOOK` variable. This way, the main function will only be executed when we run the notebook and not when we import the module.

```python
from nbdev.imports import IN_NOTEBOOK
```

You'll see this listed throughout the notebooks.

In [None]:
#| export
@hydra.main(version_base=None, config_path="../../conf", config_name="config")
def main(cfg: DictConfig) -> None:

    # Create the directory structure
    _create_directory_structure(here() / "data", cfg.datapaths)

    # test the api
    testAPI(cfg=cfg)

In [None]:
#| export
#| eval: false
try: from nbdev.imports import IN_NOTEBOOK
except: IN_NOTEBOOK=False

if __name__ == "__main__" and not IN_NOTEBOOK:
    main()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()