# Workflow Example with ERA5 input convertor
Here we illustrate a complete workflow example using the ERA5 convertor

## Imports

In [None]:
from pathlib import Path

import xarray as xr
import pandas as pd

import valenspy as vp #The Valenspy package
from valenspy.inputconverter_functions import _non_convertor, ERA5_to_CF

from yaml import safe_load
from typing import Union, List


In [None]:
#Import Converter - This input converter will not do anything to the data.
ic = vp.InputConverter(ERA5_to_CF)


## 1. Convert the data

### Get the filenames of ERA5 data corresponding to CORDEX variable name


In [None]:

machine = "hortense"


# User defined variable, here we look at 2m temperature. 
variable = "tas"
dataset = "ERA5"
domain = "belgium" # "belgium"
time_freq = "daily" ## Important to account here are the postprocessed files at daily time resolution and their naming (eg max daily temperature does not correspond to ERA5 variable name as defined!!)

### Generate the path and filename of obs file

In [None]:
# Get the current file directory and load the CORDEX variables.yml file

# files = Path(__file__).resolve().parent -- this is not working in notebook
src_path = Path("../src/valenspy") ## -- to be removed. 

with open(src_path / "ancilliary_data" / "CORDEX_variables.yml") as file:
    CORDEX_VARIABLES = safe_load(file)

# ERA5-Land has same variable defenitions as ERA5
if dataset == 'ERA5-Land':
    dataset_lookup = 'ERA5'
else: 
    dataset_lookup = dataset

with open(src_path / "ancilliary_data" / Path(dataset_lookup+"_lookup.yml")) as file:
    obs_LOOKUP = safe_load(file)

In [None]:
# this will be part of the PATH generator. 

# get path of observational datasets from path settings .yml
with open(src_path / "ancilliary_data" / Path("dataset_PATHS.yml")) as file:
    dataset_PATHS = safe_load(file)
directory = dataset_PATHS[machine][dataset]

# get ERA5 variable corresponding to the requested variable using its look-up table
obs_var = obs_LOOKUP[variable]['obs_name']

# get ERA5 subdirectory
subdirectory = Path(directory + '/' + domain +'/'+time_freq+'/'+obs_LOOKUP[variable]['obs_long_name']+'/')

# open the EOBS file for the corresponding variable
files = list(subdirectory.glob("*-"+time_freq+"-*"+obs_LOOKUP[variable]['obs_long_name']+"*.nc")) #Select all the netCDF files in the directory

In [None]:
# test of ERA5 convertor function
ERA5_ds = ic.convert_input(files) # Convert the input to the correct format
ERA5_ds


The file is ValEnsPy CF compliant.
100.00% of the variables are ValEnsPy CF compliant
ValEnsPy CF compliant: ['tas']


Unnamed: 0,Array,Chunk
Bytes,137.01 MiB,68.50 MiB
Shape,"(17520, 25, 41)","(8760, 25, 41)"
Dask graph,2 chunks in 5 graph layers,2 chunks in 5 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 137.01 MiB 68.50 MiB Shape (17520, 25, 41) (8760, 25, 41) Dask graph 2 chunks in 5 graph layers Data type float64 numpy.ndarray",41  25  17520,

Unnamed: 0,Array,Chunk
Bytes,137.01 MiB,68.50 MiB
Shape,"(17520, 25, 41)","(8760, 25, 41)"
Dask graph,2 chunks in 5 graph layers,2 chunks in 5 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [None]:
## manual implementation ERA5 convertor
file = files

In [None]:
    # based on the filename in the path, get the name of the dataset
    # Extract the relevant file name
    filename = file[0].stem 

    # Split the directory name by '-' (! era and land are also separated by '-', so order is different from era5)
    filename_parts = filename.split('-')
    
    if filename_parts[1] == 'land': 
        obsdata_name = "ERA5-Land"
    else: 
        obsdata_name = "ERA5"


    # open the observation dataset
    ds = xr.open_mfdataset(file, combine='by_coords', chunks='auto')

    # open observational specific lookup dictionary
    with open(src_path / "ancilliary_data" / Path("ERA5_lookup.yml")) as lookup_file:
        obs_LOOKUP = safe_load(lookup_file)

    # make observation CF compliant
    for var_obs in ds.data_vars: 

        # Get the CORDEX variable in the observational dataset using the observational lookup table  
        var = next((k for k, v in obs_LOOKUP.items() if v.get('obs_name') == var_obs), None)

        if var: # Dont processes variables that are not in the lookup table.

            # update variable name to CORDEX variable name
            ds = ds.rename_vars({obs_LOOKUP[var]["obs_name"]: var})

            # from here on, use CORDEX variable name to access data array and do rest of conversion

            # Unit conversion - hard coded ERA5 units for CORDEX CORE, double check beyond. 
            if (obs_LOOKUP[var]['obs_units'] == 'Celcius') or (obs_LOOKUP[var]['obs_units'] == 'degC'): 
                ds[var] = _convert_Celcius_to_Kelvin(ds[var]) 

            elif obs_LOOKUP[var]['obs_units'] == 'hPa': 
                ds[var] = _convert_hPa_to_Pa(ds[var]) # hPa to Pa

            elif (obs_LOOKUP[var]['obs_units'] == 'mm') or (obs_LOOKUP[var]['obs_units'] == 'mm/hr'): 
                ds[var] = _convert_mm_to_kg_m2s(ds[var]) # mm to kg m^-2 s^-1 conversion function reads time frequency (nseconds) of input ds to do conversion

            elif (obs_LOOKUP[var]['obs_units'] == 'm') or (obs_LOOKUP[var]['obs_units'] == 'm/hr'): 
                ds[var] = _convert_m_to_kg_m2s(ds[var]) # m to kg m^-2 s^-1 conversion function reads time frequency (nseconds) of input ds to do conversion

            elif (obs_LOOKUP[var]['obs_units'] == 'J/m^2'):  
                ds[var] = _convert_m_to_kg_m2s(ds[var]) # m to kg m^-2 s^-1 conversion function reads time frequency (nseconds) of input ds to do conversion_convert_J_m2_to_W_m2

            # update unit attribute
            ds[var].attrs["units"] = CORDEX_VARIABLES[var]["units"] # from the CORDEX look-up table 

            # add necessary metadata
            ds[var].attrs["standard_name"]      = CORDEX_VARIABLES[var]["standard_name"]  # from the CORDEX look-up table
            ds[var].attrs["long_name"]          = CORDEX_VARIABLES[var]["long_name"]  # from the CORDEX look-up table
            ds[var].attrs["original_name"]      = obs_LOOKUP[var]["obs_name"]
            ds[var].attrs["original_long_name"] = obs_LOOKUP[var]["obs_long_name"]

            # rename dimensions
            ds[var] = ds[var].rename({"latitude": "lat", "longitude": "lon"})

            # convert the time dimension to a pandas datetime index --  do we want this to happen within the convertor? Or do we leave it up to the user? 
            ds[var]['time'] = pd.to_datetime(ds[var].time)


            # additional attributes --         
            if obsdata_name == "ERA5": 

                # Extract the relevant directory name
                filename = files[0].stem 

                # Split the directory name by '-' (! era and land are also separated by '-', so order is different from era5)
                parts = filename.split('-')

                # Extract relevant parts assuming freq and domain are always after the first dash and second dash respectively
                ds[var].attrs["freq"]               = filename_parts[1] if len(filename_parts) > 1 else None  # read from file name 
                ds[var].attrs["domain"]             = filename_parts[2] if len(filename_parts) > 2 else None
                ds[var].attrs["dataset"]            = obsdata_name

                # hard coded spatial resolution
                ds[var].attrs["spatial_resolution"] = "0.25deg"  
            
            elif obsdata_name == "ERA5-Land": 

                # Extract relevant parts assuming freq and domain are always after the second dash and third dash respectively
                ds[var].attrs["freq"]               = filename_parts[2] if len(filename_parts) > 2 else None  # read from file name 
                ds[var].attrs["domain"]             = filename_parts[3] if len(filename_parts) > 3 else None

                # hard coded spatial resolution
                ds[var].attrs["spatial_resolution"] = "0.1deg"  
                ds[var].attrs["dataset"]            = obsdata_name


In [None]:
ds[var]

Unnamed: 0,Array,Chunk
Bytes,137.01 MiB,68.50 MiB
Shape,"(17520, 25, 41)","(8760, 25, 41)"
Dask graph,2 chunks in 5 graph layers,2 chunks in 5 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 137.01 MiB 68.50 MiB Shape (17520, 25, 41) (8760, 25, 41) Dask graph 2 chunks in 5 graph layers Data type float64 numpy.ndarray",41  25  17520,

Unnamed: 0,Array,Chunk
Bytes,137.01 MiB,68.50 MiB
Shape,"(17520, 25, 41)","(8760, 25, 41)"
Dask graph,2 chunks in 5 graph layers,2 chunks in 5 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
