# core

> This is a core library for the ERA5 dataset pipeline. It defines
a few helpful functions such as an API tester to test your API key and connection.

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import os
import cdsapi
import hydra
import json
import tempfile
import geopandas as gpd
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from omegaconf import DictConfig, OmegaConf
from pyprojroot import here
from importlib import import_module


## Utilities

Some utilities are provided to help you with the ERA5 dataset.

In [None]:
#| export
def describe(
    cfg: DictConfig=None,  # Configuration file
    )-> None:
    "Describe the configuration file used by Hydra for the pipeline"
    
    if cfg is None:
        print("No configuration file provided. Generating default configuration file.")
        cfg = OmegaConf.create()
        
    print("This package fetches ERA5 data. The following is the config file used by Hydra for the pipeline:\n")
    print(OmegaConf.to_yaml(cfg))

In [None]:
#| exporti
def _expand_path(
        path: str   # Path on user's machine
        )->   str:  # Expanded path
    "Expand the path on the user's machine for cross compatibility"

    # Expand ~ to the user's home directory
    path = os.path.expanduser(path)
    # Expand environment variables
    path = os.path.expandvars(path)
    # Convert to absolute path
    path = os.path.abspath(path)
    return path

In [None]:
#| exporti
def _get_callable(func_path):
    """Dynamically import a callable from a string path."""
    module_name, func_name = func_path.rsplit(".", 1)
    module = import_module(module_name)
    return getattr(module, func_name)

In [None]:
#| exporti

def _create_directory_structure(
        base_path: str,  # The base directory where the structure will be created
        structure: dict  # A dictionary representing the directory structure
    )->None:
    """
    Recursively creates a directory structure from a dictionary.

    Args:
        base_path (str): The base directory where the structure will be created.
        structure (dict): A dictionary representing the directory structure.
    """
    for folder, substructure in structure.items():
        # Create the current directory
        current_path = os.path.join(base_path, folder)
        os.makedirs(current_path, exist_ok=True)
        
        # Recursively create subdirectories if substructure is a dictionary
        if isinstance(substructure, dict):
            _create_directory_structure(current_path, substructure)

### A Class for Authenticating Google Drive

We're going to use a class to authenticate and interact with google drive. The goal is to have a simple interface to fetch the healthshed files dynamically from google drive in the pipeline.

In [None]:
#| export

class GoogleDriver:
    """
    A class to handle Google Drive authentication and file management.
    This class uses the PyDrive2 library to authenticate with Google Drive using a service account.
    
    It provides three methods: authenticating the account, getting the drive object, and downloading the healthshed files for madagascar.
    """
    def __init__(self, json_key_path=None):
        self.json_key_path = json_key_path or os.getenv("GOOGLE_DRIVE_AUTH_JSON")
        if not self.json_key_path or not os.path.isfile(self.json_key_path):
            raise FileNotFoundError(f"Service account key file not found: {self.json_key_path}")
        self.drive = self._authenticate()

    def _authenticate(self):

        settings = {
            "client_config_backend": "service",
            "service_config": {
                "client_json_file_path": self.json_key_path
            }
        }
        gauth = GoogleAuth(settings=settings)

        gauth.ServiceAuth()
        
        return GoogleDrive(gauth)

    def get_drive(self):
        return self.drive

Here's how we use it. The credentials for the data-pipeline service account are
available in the sandbox folder, and the path to said folder is set in the config:

In [11]:
from hydra import initialize, compose
from omegaconf import OmegaConf

# unfortunately, we have to use the initialize function to load the config file
# this is because the @hydra decorator does not work with Notebooks very well
# this is a known issue with Hydra: https://gist.github.com/bdsaglam/586704a98336a0cf0a65a6e7c247d248
# 
# just use the relative path from the notebook to the config dir
with initialize(version_base=None, config_path="../conf"):
    cfg = compose(config_name='config.yaml')

In [43]:
auth = GoogleDriver(json_key_path=here() / cfg.GOOGLE_DRIVE_AUTH_JSON.path)
drive = auth.get_drive()

Here's how we might check that the healthsheds are accessible in the drive:

In [None]:
folder_id = cfg.GOOGLE_DRIVE_AUTH_JSON.healthsheds_id
folder_name = "healthsheds2022.zip"
file_list = drive.ListFile({'q': f" title='{folder_name}' and trashed = false "}).GetList()

for file in file_list:
    print(f"{file['title']} - {file['mimeType']}")

healthsheds2022.zip (1v2y-WhsTQxYyj8AWQPYkoVH0l-542IUQ) - application/zip


That being said, we can read in  the healthsheds into geopandas by downloading them to a temp directory. The healthsheds must be a zipped shapefiles package with the files at the root of the zip directory.

In [39]:
with tempfile.TemporaryDirectory() as temp_dir:
    # Create a temporary directory to store the downloaded file
    zip_path = os.path.join(temp_dir, folder_name)

    # Download file from Google Drive
    file_obj = drive.CreateFile({'id': file_list[0]['id']})
    file_obj.GetContentFile(zip_path)

    # Read shapefile directly from ZIP
    gdf = gpd.read_file(f"zip://{zip_path}")




That works! So now we can patch the class to include this workflow:

In [None]:
#| export
from fastcore.basics import patch

In [None]:
#| export

@patch
def read_healthsheds(self:GoogleDriver, healthshed_zip_name):

    file_list = self.drive.ListFile({'q': f" title='{healthshed_zip_name}' and trashed = false "}).GetList()

    with tempfile.TemporaryDirectory() as temp_dir:
        # Create a temporary directory to store the downloaded file
        zip_path = os.path.join(temp_dir, healthshed_zip_name)

        # Download file from Google Drive
        file_obj = self.drive.CreateFile({'id': file_list[0]['id']})
        file_obj.GetContentFile(zip_path)

        # Read shapefile directly from ZIP
        gdf = gpd.read_file(f"zip://{zip_path}")

        # we need to ensure that the healthsheds only contain valid polygons
        gdf = gdf[gdf.geometry.notnull()]
        gdf.reset_index(drop=True, inplace=True)
        
        return gdf

And to check that it works:

In [None]:
driver = GoogleDriver(json_key_path=here() / cfg.GOOGLE_DRIVE_AUTH_JSON.path)
drive = driver.get_drive()
healthsheds = driver.read_healthsheds("healthsheds2022.zip")

healthsheds.describe()

Unnamed: 0,fs_uid,fs_pop,n_uid,n_instat,reg_uid,reg_name,dist_uid,dist_name,fs_type,fs_name,fs_ll,n_comp,n_shape,geometry
0,A0YU5ksfXZS,13014,6,6,O0yrAFTjghG,Vatovavy,hBOXdumAvNc,Mananjary,CSB2,CSB2 Morafeno,POINT (48.180332 -21.097472),1.0,1,"POLYGON ((48.21537 -20.95662, 48.21593 -20.956..."
1,A1SY9AiVPYF,3103,4,4,I9lEj4mALls,Analamanga,vHRv6NgA70x,Manjakandriana,CSB1,CSB1 Ambohidraisolo,POINT (47.879317 -19.161348),1.0,1,"POLYGON ((47.85923 -19.09954, 47.8601 -19.0998..."
2,A38WhL0NPsX,2344,3,3,O0yrAFTjghG,Vatovavy,hBOXdumAvNc,Mananjary,CSB2,CSB2 Mahatsara Iefaka,POINT (48.326937 -21.11468),1.0,1,"POLYGON ((48.35588 -21.05353, 48.35801 -21.057..."
3,A6fVNQgqqJg,7494,5,5,kgGIXgdG56r,Haute Matsiatra,BU35owjfn8G,Vohibato,CSB2,CSB2 Ankaromalaza,,1.0,1,"POLYGON ((47.18552 -21.70624, 47.19275 -21.711..."
4,A77QRkmKUul,8387,5,4,I9lEj4mALls,Analamanga,dsDbxSkO1ST,Andramasina,CSB1,CSB1 Mangabe,POINT (47.716094 -19.176215),1.0,1,"POLYGON ((47.73249 -19.15372, 47.73267 -19.153..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2768,zw74in2A9Vn,4580,5,5,A8UMJuP8iI3,Boeny,ffiVmdBUwzI,Marovoay,CSB2,CSB2 Ampijoroa Nord,,1.0,1,"POLYGON ((46.73145 -16.12848, 46.73365 -16.131..."
2769,zwhfpU5j9aV,7440,5,5,PTqLWwjcAox,Anosy,KBe7h4EfJDf,Taolagnaro,CSB2,CSB2 Tanandava,,1.0,1,"POLYGON ((47.00558 -24.44654, 47.00489 -24.446..."
2770,zwyM0iDw9X7,5705,3,3,wR0PL2iap0s,Atsinanana,xgvRu8zZAZK,Marolambo,CSB1,CSB1 Maroariana I,POINT (47.948408 -20.025144),1.0,1,"POLYGON ((47.95335 -19.9937, 47.95573 -20.0087..."
2771,zxQu4lRMMP9,7647,6,5,zJ9UJ7RhCwV,Diana,s3HejcPkUeJ,Antsiranana II,CSB2,CSB2 Antsalaka,POINT (49.251201 -12.640404),1.0,1,"POLYGON ((49.27604 -12.6479, 49.27603 -12.6479..."


## Tests and Main

In `nbdev`, our tests are embedded in the notebook. Whenever you export the notebook, all the cells that are specified to run are run, and hence, the tests are executed. The tests are also exported. This is a great way to ensure that your documentation is always up-to-date. For this module, we're using the `testAPI()` function as our main test.

In [None]:
#| export
def testAPI(
    cfg: DictConfig=None,
    dataset:str="reanalysis-era5-pressure-levels"
    )-> bool:    
    
    # parse config
    testing=cfg.development_mode
    output_path=here("data") / "testing"

    print(OmegaConf.to_yaml(cfg))

    try:
        client = cdsapi.Client()

        # build request
        request = {
            'product_type': ['reanalysis'],
            'variable': ['geopotential'],
            'year': ['2024'],
            'month': ['03'],
            'day': ['01'],
            'time': ['13:00'],
            'pressure_level': ['1000'],
            'data_format': 'grib',
        }

        target = output_path / 'test_download.grib'
        
        print("Testing API connection by downloading a dummy dataset to {}...".format(output_path))

        client.retrieve(dataset, request, target)

        if not testing:
            os.remove(target)
        
        print("API connection test successful.")
        return True

    except Exception as e:
        print("API connection test failed.")
        print("Did you set up your API key with CDS? If not, please visit https://cds.climate.copernicus.eu/how-to-api#install-the-cds-api-client")
        print("Error: {}".format(e))
        return False

We can see that this API tester tool works with Hydra configuration:

In [None]:
from hydra import initialize, compose
from omegaconf import OmegaConf

# unfortunately, we have to use the initialize function to load the config file
# this is because the @hydra decorator does not work with Notebooks very well
# this is a known issue with Hydra: https://gist.github.com/bdsaglam/586704a98336a0cf0a65a6e7c247d248
# 
# just use the relative path from the notebook to the config dir
with initialize(version_base=None, config_path="../conf"):
    cfg = compose(config_name='config.yaml')

describe(cfg)

This package fetches ERA5 data. The following is the config file used by Hydra for the pipeline:

query:
  product_type: reanalysis
  variable:
  - 2m_dewpoint_temperature
  - 2m_temperature
  - skin_temperature
  - total_precipitation
  year:
  - 2010
  - 2011
  month:
  - 1
  - 2
  - 3
  - 4
  - 5
  - 6
  - 7
  - 8
  - 9
  - 10
  - 11
  - 12
  day:
  - 1
  - 2
  - 3
  - 4
  - 5
  - 6
  - 7
  - 8
  - 9
  - 10
  - 11
  - 12
  - 13
  - 14
  - 15
  - 16
  - 17
  - 18
  - 19
  - 20
  - 21
  - 22
  - 23
  - 24
  - 25
  - 26
  - 27
  - 28
  - 29
  - 30
  - 31
  time:
  - 0
  - 1
  - 2
  - 3
  - 4
  - 5
  - 6
  - 7
  - 8
  - 9
  - 10
  - 11
  - 12
  - 13
  - 14
  - 15
  - 16
  - 17
  - 18
  - 19
  - 20
  - 21
  - 22
  - 23
  area:
  - 0
  - 360
  - -90
  - 90
  data_format: netcdf
  download_format: unarchived
datapaths:
  input: null
  output: null



### Importing the Main Function

Important: using `__main__` in nbdev and Hydra is a little bit tricky. We need to define the main function in the module ONLY ONCE and then when we export the notebook to script, we need to add the `nbdev.imports.IN_NOTEBOOK` variable. This way, the main function will only be executed when we run the notebook and not when we import the module.

```python
from nbdev.imports import IN_NOTEBOOK
```

You'll see this listed throughout the notebooks.

In [None]:
#| export
@hydra.main(version_base=None, config_path="../../conf", config_name="config")
def main(cfg: DictConfig) -> None:

    # Create the directory structure
    _create_directory_structure(here() / "data", cfg.datapaths)

    # test the api
    testAPI(cfg=cfg)

In [None]:
#| export
#| eval: false
try: from nbdev.imports import IN_NOTEBOOK
except: IN_NOTEBOOK=False

if __name__ == "__main__" and not IN_NOTEBOOK:
    main()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()