In [None]:
#import all package functions from a local file
from custom_python_package import *

In [None]:
def create_file_name(input_data_path, temp_data_path, output_data_path, input_era5_file_name, input_ecmwf_file_name, output_era5_file_name, output_ecmwf_file_name, admin_file_name, admin_code_label):  

    
    global era5_raw_data_file_path, era5_regrid_file_path, ecmwf_raw_data_file_path, admin_boundary_file_path, ecmwf_processed_pixel_file_path 
    global ecmwf_processed_adm_file_path, ecmwf_bias_corr_pixel_file_path, ecmwf_bias_corr_adm_file_path, ref_grid_file_path 
    global era5_processed_pixel_file_path, era5_processed_adm_file_path, ecmwf_quantile_pixel_file_path
    global ecmwf_quantile_bias_corr_pixel_file_path, ecmwf_quantile_adm_file_path, ecmwf_quantile_bias_corr_adm_file_path
    
    #input datasets
    era5_raw_data_file_path = input_data_path + input_era5_file_name + '.grib'    
    ecmwf_raw_data_file_path = input_data_path + input_ecmwf_file_name + '.grib'
    admin_boundary_file_path = input_data_path + admin_file_name
    era5_regrid_file_path = input_data_path + input_era5_file_name + '-regrid' + '.nc'
    
    
    #ECMWF intermediary datasets    
    ecmwf_processed_pixel_file_path =  temp_data_path + output_ecmwf_file_name + '-processed-pixel' + '.parquet.gzip'
    ecmwf_processed_adm_file_path = temp_data_path + output_ecmwf_file_name + '-processed-adm' + '.parquet.gzip'
    ecmwf_bias_corr_pixel_file_path = temp_data_path + output_ecmwf_file_name + '-bias-corrected-pixel' + '.parquet.gzip'
    ecmwf_bias_corr_adm_file_path = temp_data_path + output_ecmwf_file_name + '-bias-corrected-adm' + '.parquet.gzip'
    ref_grid_file_path = temp_data_path + output_ecmwf_file_name + '-reference-grid' + '.parquet.gzip'
    
    
    #ERA5 intermediary/export datasets
    era5_processed_pixel_file_path =  output_data_path + output_era5_file_name + '-processed-pixel' + '.parquet.gzip'
    era5_processed_adm_file_path = output_data_path + output_era5_file_name + '-processed-adm' + '.parquet.gzip'
    
    #ECMWF export datasets
    ecmwf_quantile_pixel_file_path = output_data_path + output_ecmwf_file_name + '-quantile-pixel' + '.parquet.gzip'
    ecmwf_quantile_bias_corr_pixel_file_path = output_data_path + output_ecmwf_file_name + '-quantile-bias-corrected-pixel' + '.parquet.gzip'
    ecmwf_quantile_adm_file_path = output_data_path + output_ecmwf_file_name + '-quantile-adm' + '.parquet.gzip'
    ecmwf_quantile_bias_corr_adm_file_path = output_data_path + output_ecmwf_file_name + '-quantile-bias-corrected-adm' + '.parquet.gzip'


    return()

In [None]:
def run_pipeline(perform_regrid):

    #Converts ECMWF grib file into a dataframe, start processing the format and exports it to a parquet file
    pre_process_ecmwf_data(ecmwf_raw_data_file_path, admin_boundary_file_path, ref_grid_file_path, ecmwf_processed_pixel_file_path, ecmwf_processed_adm_file_path, admin_code_label)

    #Convert ERA5 grid to the same one used for ECMWF
    if perform_regrid == True:
        regrid_climate_data(era5_raw_data_file_path, ecmwf_raw_data_file_path, era5_regrid_file_path)
    
    #Converts ERA5 grib file into a dataframe, start processing the format and exports it to a parquet file
    pre_process_era5_data(era5_regrid_file_path, admin_boundary_file_path, ref_grid_file_path, era5_processed_pixel_file_path, era5_processed_adm_file_path)
    
    #Correct the bias between ECMWF and ERA5 values (for every location and month). Executes it twice, one at the 
    #grid point level (pixel) and one at the admin boundary level
    ecmwf_bias_correction(ecmwf_processed_pixel_file_path, era5_processed_pixel_file_path, ecmwf_bias_corr_pixel_file_path)
    ecmwf_bias_correction(ecmwf_processed_adm_file_path, era5_processed_adm_file_path, ecmwf_bias_corr_adm_file_path)

    #Compute quantile probabilities based on ERA5 climatology and ECMWF ensemble models. Executes it four times, following
    #the geospatial unit (pixel or administrative boundary level) and the bias correction status (before and after bias correction)
    compute_quantile_probability(ecmwf_processed_pixel_file_path, era5_processed_pixel_file_path, ecmwf_quantile_pixel_file_path)
    compute_quantile_probability(ecmwf_bias_corr_pixel_file_path, era5_processed_pixel_file_path, ecmwf_quantile_bias_corr_pixel_file_path)
    compute_quantile_probability(ecmwf_processed_adm_file_path, era5_processed_adm_file_path, ecmwf_quantile_adm_file_path)
    compute_quantile_probability(ecmwf_bias_corr_adm_file_path, era5_processed_adm_file_path, ecmwf_quantile_bias_corr_adm_file_path)
    
    return()

In [None]:
#define the naming used for each file, be it input, output or intermediary files. Used when running the pipeline (or analysis)
#locally but should be replaced by the correct datasets in the Databricks pipeline

########################
#To be changed following country-specific file name
#era5_file_name = 'era5_total_precipitation_africa_1981_2023_all_months'
#ecmwf_file_name = 'ecmwf-monthly-seasonalforecast-1981-2023-afr'

########################
input_data_path = './data/input_data/' 
temp_data_path = './data/temp_data/'
output_data_path = './data/output_data/'

input_era5_file_name = 'era5-total-precipitation-1981-2023'
input_ecmwf_file_name = 'ecmwf-monthly-seasonalforecast-1981-2023-002'

output_era5_file_name = 'era5-ethiopia'
output_ecmwf_file_name = 'ecmwf-ethiopia'

admin_file_name = 'admin_boundary_eth/eth_admbnda_adm1_csa_bofedb_2021.shp'
admin_code_label = 'ADM1_PCODE'
########################



In [None]:
create_file_name(input_data_path, temp_data_path, output_data_path, input_era5_file_name, input_ecmwf_file_name, output_era5_file_name, output_ecmwf_file_name, admin_file_name, admin_code_label)  




In [None]:
run_pipeline(perform_regrid = False)