## Data Downloader for ERA5 and GLoFAS data

defines some methods to download era5 data

In [37]:
import os, warnings, time
import cdsapi
import multiprocessing as mp

In [4]:
UID = #enter_uid_here
API_key = #enter_key_here

with open(os.path.join(os.path.expanduser('~'), '.cdsapirc'), 'w') as f:
    f.write('url: https://cds.climate.copernicus.eu/api/v2\n')
    f.write(f'key: {UID}:{API_key}')

SyntaxError: invalid syntax (<ipython-input-4-89ad931fed9e>, line 1)

In [84]:
def build_request(kwargs, input_checking=True):
    """Check request for mandatory fields to make valid CDS API retrievals."""
    kwargs = kwargs.copy()
    if kwargs['base_level'] == 'pressure' and 'pressure_level' not in kwargs:
        raise IOError('base_level is pressure, but pressure_level not in kwargs')
        
    
    mandatory_fields = ["product_type", "format", "variable", "year", "month"]
    if not input_checking:
        mandatory_fields = []
    
    assumed_args = {"day":    ["01", "02", "03", "04",
                               "05", "06", "07", "08",
                               "09", "10", "11", "12",
                               "13", "14", "15", "16",
                               "17", "18", "19", "20",
                               "21", "22", "23", "24",
                               "25", "26", "27", "28",
                               "29", "30", "31"],
                    "time":  ["00", "01", "02", "03", "04", "05",
                              "06", "07", "08", "09", "10", "11",
                              "12", "13","14", "15", "16", "17",
                              "18", "19", "20","21", "22", "23"]}
    assume_fields = assumed_args.keys()

    # input checks
    for key in mandatory_fields:  # add mandatory arguments
        if key not in kwargs: 
            raise ValueError(f'"{key}" not found in arguments, but is a mandatory field!')
    
    if kwargs['base_level'] == 'pressure' and 'pressure_level' not in kwargs:
        raise IOError('base_level is pressure, but pressure_level not in kwargs')
    
    request_name = f"reanalysis-era5-{kwargs.pop('base_level')}-levels"
    request = {}
    for key in mandatory_fields:
        #print(kwargs)
        request[key] = kwargs.pop(key)
            
    for key in list(kwargs):  # add optional arguments
        request[key] = kwargs.pop(key)
                                  
    for key in assume_fields:  # assume some arguments if not given
        if key not in request:
            warnings.warn(f'"{key}" not found in arguments, assuming {key}={assumed_args[key]}')
    return request
             
    
def test(a,b,c):
    print(b)
    time.sleep(5)
    return

def list_if_str(a):
    return a if isinstance(a, list) else list(a)

def cds_optimized_retrieval(save_to_folder: str, 
                            dataset_name: str, 
                            request_in: dict,
                            N_parallel_requests=1):
    c = cdsapi.Client()
    if N_parallel_requests>1:
        p = mp.Pool(int(N_parallel_requests))
    
    # download era5 data with the cdsapi
    # data request efficiency is highest when executed on a monthly basis
    if 'year' not in request_in:
        raise ValueError('"year" not given')
    if 'month' not in request_in:
        raise ValueError('"month" not given')
    years = list_if_str(request_in['year'])
    months = list_if_str(request_in['month'])

    # loop over time range
    for y in sorted(years):
        for m in sorted(months):
            m = str(m).zfill(2)  # leading zero

            request = request_in.copy()
            request['year'] = y
            request['month'] = m
            
            request = build_request(request)         
            save_to_filename = f'{save_to_folder}/{dataset_name}_{"".join(list(request["variable"]))}_{y}_{m}.nc'

            # start a request for one month; only execute if file does not exist
            if not os.path.isfile(save_to_filename):
                p.apply_async(test, args=(dataset_name, request, save_to_filename))
                #c.retrieve
    p.close()
    p.join()

In [85]:
# define areas of interest
area_dict = {'danube': '50/7/47/20',
             'asia': '55/-140/0/35',
             'usa': '50/-125/25/-70'
            }
# choose area: 'danube', 'asia', 'usa'


In [None]:
request = dict(product_type='reanalysis', format='netcdf', base_level='pressure', 
               #area=area_dict['usa'],
               variable=['geopotential', 'temperature', 'specific humidity'], 
               pressure_level=['850', '700', '500'], 
               year=[str(y) for y in range(2000, 2010)], 
               month=[str(a).zfill(2) for a in range(1,13)])

dataset_name = f"reanalysis-era5-{request['base_level']}-levels"


save_to_folder='/raid/home/srvx7/lehre/users/a1254888/ipython/ml_flood/data/'
cds_optimized_retrieval(save_to_folder, dataset_name, request, N_parallel_requests=10)

{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2000', 'month': '01', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2000', 'month': '02', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2000', 'month': '07', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2000', 'month': '03', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2000', 'month': '05', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'var



{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2000', 'month': '11', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2000', 'month': '12', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2001', 'month': '01', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2001', 'month': '02', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2001', 'month': '03', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'var

{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2004', 'month': '07', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2004', 'month': '08', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2004', 'month': '09', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2004', 'month': '10', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'variable': ['geopotential', 'temperature', 'specific humidity'], 'year': '2004', 'month': '11', 'pressure_level': ['850', '700', '500']}
{'product_type': 'reanalysis', 'format': 'netcdf', 'var

### download request for the data of interest on pressure levels

In [None]:


# variables
variable = ['geopotential', 'temperature']#, 'specific humidity']

# pressure levels
base_level = 'pressure' # 'pressure' or 'single'
pressure_level = ['850', '700', '500']


# define time range: start end in the format YYYY:MM
time_start = '1981:01' # full range start: '1981:01'
time_end = '2017:12' # full range end: '2017:12'

# create savename string
variablestr = "_".join([x.replace(' ', '_') for x in variable])
pressure_levelstr = "_".join([x for x in pressure_level])
savename = f'era5_{variablestr}_{pressure_levelstr}'

# retrieve data
era5_retrieval(savename=savename, area=area, base_level=base_level, variable=variable,
               pressure_level=pressure_level, time_start=time_start, time_end=time_end)