In [1]:
# (C) Copyright 1996- ECMWF.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

In [2]:
from pathlib import Path
import multiprocessing
import tqdm
from datetime import datetime

from itertools import product, groupby
import random

import numpy as np 
import pandas as pd 
import metview as mv # for retrieving grib data from MARS

In [3]:
main_out_folder = '/Data/'
ERA5_loc = main_out_folder+'ERA5/'

In [4]:
lead_days_used = list((np.arange(1, 46)*24))

area_atm_var = [55, -15, 20, 45] # domain used for the large-scale atmospheric variability (coordinates as in N/W/S/E)
grid_atm_var = [1, 1] # resolution for the large-scale atmospheric variability

area_precipt = [48, -10, 27, 41] #  domain of interest for the precipitation (Mediterranean domain)
grid_precipt = [.25, .25] # grid resolution in degrees for the precipitation data

Reforecasts_vars_used = {'SLP': [0, 'sfc', '151.128'],
             'Z500': [500, 'pl', '129.128'],
            }

ERA5_vars_used = [[0, 'sfc', '151.128', 'D1_Mean_SLP'],   # SLP data
               [500, 'pl', '129.128', 'D1_Mean_Z500'], # Z500 data
               ]

In [5]:
# Create all files for storing the large-scale atmospheric variability reforecasts data
Files_locs = [main_out_folder+i[0]+'/'+i[1]+'/' for i in list(product(Reforecasts_vars_used.keys(), ['cf', 'pf']))]
[Path(files_loc).mkdir(parents=True, exist_ok=True) for files_loc in Files_locs]

# Create all files for storing the precipitation reforecasts data
Files_locs = [main_out_folder+i[0]+'/'+i[1]+'/' for i in list(product(['Precipitation'], ['cf', 'pf']))]
[Path(files_loc).mkdir(parents=True, exist_ok=True) for files_loc in Files_locs]

# Create the subfolder for storing the ERA5 data
Path(ERA5_loc).mkdir(parents=True, exist_ok=True) 

del(Files_locs)

## Download reforecasts data

Use dates that have the same Cycle so that the forecast data are consistent. Details about changes in cycles are available at https://www.ecmwf.int/en/forecasts/documentation-and-support/changes-ecmwf-model

In [6]:
# Use dates for Cycle 46r1 11 June 2019 - 30 June 2020
start_date = '20190611'
end_date = '20200630'

initialization_dates = pd.date_range(start_date, end_date)

# keep Mondays (0) and Thursdays (3)
kept_dates = (initialization_dates.weekday == 0) | (initialization_dates.weekday == 3)
initialization_dates = initialization_dates[kept_dates]
initialization_dates = initialization_dates.strftime('%Y%m%d')

del(start_date, end_date, kept_dates)

### Download data of SLP and Z500

In [7]:
def download_data(input_data):
    
    """
    Download data internally from MARS and not from S2S database, because the latter has data stored in coarse
    resolution, thus there is downscaling perfomed, which increases the errors
    """
    
    param, type_used, init_date = input_data
    
    levelist_used = Reforecasts_vars_used[param][0]
    levtype_used = Reforecasts_vars_used[param][1]
    param_used = Reforecasts_vars_used[param][2]
    
    files_loc = main_out_folder+param+'/'+type_used+'/'
    file_name = files_loc+param+'_'+type_used+'_'+init_date+'.grb'

    if not Path(file_name).exists(): # if data are not already available, then download and save them
        
        init_year = int(init_date[:4])
        hdates_used = ['{}{}'.format(x, init_date[-4:]) for x in range(init_year-20, init_year)]

        if type_used == 'cf':
            fc_all = mv.retrieve(Class = 'od', 
                                 date = init_date, 
                                 expver = 1, 
                                 hdate = hdates_used,
                                 levelist = [levelist_used],
                                 levtype = levtype_used,
                                 param = param_used,
                                 step = lead_days_used, 
                                 time = ['00:00:00'],
                                 stream = 'enfh',
                                 type = 'cf',
                                 area = area_atm_var, 
                                 grid = grid_atm_var)

        elif type_used == 'pf':
            fc_all = mv.retrieve(Class = 'od', 
                                 date = init_date, 
                                 expver = 1, 
                                 hdate = hdates_used,
                                 levelist = [levelist_used],
                                 levtype = levtype_used,
                                 param = param_used,
                                 step = lead_days_used, 
                                 time = ['00:00:00'],
                                 stream = 'enfh',
                                 type = 'pf',
                                 number = list(np.arange(1, 11)),
                                 area = area_atm_var, 
                                 grid = grid_atm_var)        
        
        mv.write(file_name, fc_all) # save data

In [8]:
All_combs = list(product(Reforecasts_vars_used.keys(), ['cf', 'pf'], initialization_dates))
pool = multiprocessing.Pool() # object for multiprocessing
Downloads = list(tqdm.tqdm(pool.imap(download_data, All_combs), total=len(All_combs), position=0, leave=True)) 
pool.close()
del(pool, All_combs)

### Download precipitation data

In [9]:
def download_precip(input_data):
    
    type_used, init_date = input_data
    
    files_loc = main_out_folder+'Precipitation/'+type_used+'/'
    file_name = files_loc+'Precipitation_'+type_used+'_'+init_date+'.grb'

    if not Path(file_name).exists(): # if data are not already available, then download and save them
        
        init_year = int(init_date[:4])
        hdates_used = ['{}{}'.format(x, init_date[-4:]) for x in range(init_year-20, init_year)]

        if type_used == 'cf':
            fc_all = mv.retrieve(Class = 'od', 
                                 date = init_date, 
                                 expver = 1, 
                                 hdate = hdates_used,
                                 levelist = 0,
                                 levtype = 'sfc',
                                 param = '228.128',
                                 step = lead_days_used, 
                                 time = ['00:00:00'],
                                 stream = 'enfh',
                                 type = 'cf',
                                 area = area_precipt, 
                                 grid = grid_precipt,
                                 )

        elif type_used == 'pf':
            fc_all = mv.retrieve(Class = 'od', 
                                 date = init_date, 
                                 expver = 1, 
                                 hdate = hdates_used,
                                 levelist = 0,
                                 levtype = 'sfc',
                                 param = '228.128',
                                 step = lead_days_used, 
                                 time = ['00:00:00'],
                                 stream = 'enfh',
                                 type = 'pf',
                                 number = list(np.arange(1, 11)),
                                 area = area_precipt, 
                                 grid = grid_precipt,
                                 )        
        
        fc_all = fc_all*1000 # convert to mm
        
        mv.write(file_name, fc_all) # save data

In [10]:
All_combs = list(product(['cf', 'pf'], initialization_dates))
pool = multiprocessing.Pool() # object for multiprocessing
Downloads = list(tqdm.tqdm(pool.imap(download_precip, All_combs), total=len(All_combs), position=0, leave=True)) 
pool.close()
del(pool, All_combs)

## Download ERA5

### Download SLP & Z500 data consistent with reforecast data

In [11]:
dates_generated_all = pd.date_range(start = '19790101', end = '20210101').strftime('%Y%m%d').to_list() # dates
 
# dates are chunked per year-month for efficient download, since MARS uses this subsetting for storing the data
dates_atm_vars = [list(v) for l, v in groupby(dates_generated_all[:], lambda x: x[:6])]

In [12]:
def download_ERA5_0UTC(input_data):
    
    levelist, levtype, atm_var, dates_subset = input_data # inputs to be a list of 4 in specific order!
    
    '''
    Function for downloading data of atmospheric variables from MARS and calculating daily mean values
    
    :param levelist: level of interest, e.g. 0 for surface parameters, 500 for 500 hPa
    :param levtype: leveltype of interest, e.g. pressure levels ('pl'), surface ('sfc')
    :param atm_var: paramater of interest (e.g. the SLP is flagged as 151.128 at MARS)
    :param dates_subset: the subset of dates to be downloaded
    '''
    
    # function for retrieving the data from MARS
    fc_all = mv.retrieve(Class = 'ea', # class of data, e.g. ERA5 ('ea')
                         stream = 'oper', # stream of interest, e.g. Ensemble ('enfo'), Deterministic ('oper') 
                         expver = 1, # experiment's version, e.g. Operational (1), Research (xxxx[A-Z/0-9])
                         type = 'an', # type of data, e.g. Analysis ('an')
                         param = atm_var, 
                         levtype = levtype,
                         levelist = levelist,
                         date = dates_subset,
                         time = 0, # keep only 0 UTC for having consistency with reforecasts data
                         area = area_atm_var,
                         grid = grid_atm_var, 
                         )
    
    return fc_all

In [13]:
Times = len(dates_atm_vars)
    
AtmVar = {}
for var in ERA5_vars_used:
    
    Daily = mv.Fieldset()
    for i_dates in tqdm.tqdm(dates_atm_vars):
        levelist, levtype, atm_var, file_name = var 
        i_daily = download_ERA5_0UTC([levelist, levtype, atm_var, i_dates])
        Daily.append(i_daily)
        
    mv.write(ERA5_loc + file_name + '_00UTC.grb', Daily) # save data
    AtmVar[var[-1].split('_')[-1]] = Daily
    
del(Times, var, i_dates, levelist, levtype, atm_var, file_name, i_daily, Daily)

### Download ERA data based on all hourly data

In [14]:
def download_ERA5_fullhourly(input_data):
    
    levelist, levtype, atm_var, dates_subset = input_data # inputs to be a list of 4 in specific order!
    
    '''
    Function for downloading data of atmospheric variables from MARS and calculating daily mean values
    
    :param levelist: level of interest, e.g. 0 for surface parameters, 500 for 500 hPa
    :param levtype: leveltype of interest, e.g. pressure levels ('pl'), surface ('sfc')
    :param atm_var: paramater of interest (e.g. the SLP is flagged as 151.128 at MARS)
    :param dates_subset: the subset of dates to be downloaded
    '''
    
    # function for retrieving the data from MARS
    fc_all = mv.retrieve(Class = 'ea', # class of data, e.g. ERA5 ('ea')
                         stream = 'oper', # stream of interest, e.g. Ensemble ('enfo'), Deterministic ('oper') 
                         expver = 1, # experiment's version, e.g. Operational (1), Research (xxxx[A-Z/0-9])
                         type = 'an', # type of data, e.g. Analysis ('an')
                         param = atm_var, 
                         levtype = levtype,
                         levelist = levelist,
                         date = dates_subset,
                         time = list(range(0,24)), # all hourly timesteps
                         area = area_atm_var,
                         grid = grid_atm_var, 
                         )

    Daily_sub = mv.Fieldset() # mv for values for dates_subset
    fields = mv.grib_get(fc_all, ['date']) # get the 'date' field from the fc_all object

    for day_i in dates_subset: # loop through the whole list of unique dates_subset

        used_indices = list(np.where(np.array(fields) == day_i)[0]) # indices that belong to the day of interest
        used_indices = np.array(used_indices, dtype='float64') # convert to float64 for using it at mv object
        daily_subset = fc_all[used_indices] # subset and keep only the fields of the day of interest
        Daily_sub.append(mv.mean(daily_subset)) # calculate the daily mean and append it
    
    return Daily_sub

In [15]:
Times = len(dates_atm_vars)
    
AtmVar = {}
for var in ERA5_vars_used:
    
    levelist, levtype, atm_var, file_name = var 
    Inputs = list(zip([levelist]*Times, [levtype]*Times, [atm_var]*Times, dates_atm_vars))
    
    pool_atmvar = multiprocessing.Pool() # object for multiprocessing
    Daily = list(tqdm.tqdm(pool_atmvar.imap(download_ERA5_fullhourly, Inputs), 
                           total=Times, position=0, leave=True)) # list of mv.Fieldsets
    pool_atmvar.close()
    del(pool_atmvar)
    
    for i in range(1, Times): # concatenate all Fieldsets to the first one
        Daily[0].append(Daily[i])

    Daily = Daily[0] # keep the full set of the atmospheric variable data
    
    mv.write(ERA5_loc + file_name + '.grb', Daily) # save data

    AtmVar[var[-1].split('_')[-1]] = Daily
    
del(Times, var, levelist, levtype, atm_var, file_name, Inputs, Daily, i)

### Download precipitation data

In [16]:
dates_precipit = [dates_atm_vars[i][-1:] + dates_atm_vars[i+1] for i in range(len(dates_atm_vars)-1)] # from 2nd chunk
dates_precipit.insert(0, dates_atm_vars[0]) # append the 1st chunk so all the dates are now complete

In [17]:
def download_precip(dates_subset):
    
    ' Function for downloading precipitation data from MARS and calculating daily total values '
    
    fc_all = mv.retrieve(Class = 'ea', # class of data, e.g. ERA5 ('ea')
                         stream = 'oper', # stream of interest, e.g. Ensemble ('enfo'), Deterministic ('oper') 
                         expver = 1, # experiment's version, e.g. Operational (1), Research (xxxx[A-Z/0-9])
                         type = 'fc', # type of data, e.g. Forecast ('fc'), Analysis ('an')
                         param = 'tp', # used paramater: Total Precipitation ('tp' = '228.128')
                         levtype = 'sfc',
                         levelist = 0,
                         date = dates_subset, # use the subset of dates
                         time = [6, 18], # time steps of interest (forecast fields only at 06:00 & 18:00)
                         step = list(range(7,19)), # precipitation is calculated from short-range forecasted data
                         grid = grid_precipt,
                         area = area_precipt,
                         )

    Daily_sub = mv.Fieldset() # create the mv object for storing the daily values for the dates_subset

    for i_day in range(len(dates_subset) - 1): # loop through the whole list of unique dates_subset

        # downloaded data are in the sequence: day_i 06:00 steps 7-18 (12 steps), 18:00 steps 7-18 (12 steps)
        start_indice = 12 + 24*i_day # data for daily accumulation start at 18:00 step 7 of previous day
        end_indice = 12 + 24*(i_day+1) # data end at 06:00 step 12 of current day (24 hourly steps in total)

        sub = mv.sum(fc_all[start_indice : end_indice]) # total daily precipitation
        sub = mv.grib_set(sub, ['date', int(dates_subset[i_day + 1])]) # replace the date field with the correct date

        Daily_sub.append(sub) # append to Daily_sub metview object
    
    return Daily_sub

In [18]:
pool_precip = multiprocessing.Pool() # object for multiprocessing for creating a list of mv.Fieldsets
Precip = list(tqdm.tqdm(pool_precip.imap(download_precip, dates_precipit), 
                        total=len(dates_precipit), position=0, leave=True))
pool_precip.close()

for i in range(1, len(Precip)): # concatenate all Fieldsets to the first one
    Precip[0].append(Precip[i])

Precip = Precip[0] # keep the full set of the precipitation data
Precip = Precip*1000 # convert to mm
Precip = mv.read(data=Precip, area=area_precipt) # crop to the actual area of interest

mv.write(ERA5_loc + 'D1_Total_Precipitation.grb', Precip) # Save the daily total precipitation file

del(pool_precip, i)