# Downloading from the ESGF archive

This Notebook runs through all the steps to download CORDEX data from the Earth System Grid Federation archive, from a single dataset, to multiple ensembles including various models, variables, and experiments. Before you start, check out the 'Before you start' section to ensure you have the appropriate setup to run the code:

### CORDEX

CORDEX, or the Coordinate Regional Downscaling Experiment, is an internationally coordinate effort to produce regional climate model data for several of the world's key regions. Boundary conditions for the regions are provided by an ensemble of General Circulation Models (GCMs), with high-resolution Regional Climate Models (RCMs) handling the dynamics within the region. The project has standardised a number of experiments for each GCM-RCM pair to run, including a historical run and one for each Representative Concentration Pathway (RCP). The full dataset can be browsed manually at https://esgf-data.dkrz.de/search/cordex-dkrz/. For the Pyrenees, the region of interest has the code 'EUR-11'.

Even for just a single variable and a handful of experiments, the data available is in the dozens of GBs and hundreds of individual files. For this reason, it is significantly more convenient to download the data we need programmatically. This notebook sets out how we can do that using a handful of useful libraries, notably pyesgf for querying th ESGF database and aiohttp / asyncio for asynchronous downloading of files.

### Before you start

Before you start, please ensure:

- All libraries listed below installed by either pip or conda (installation instructions available in the online documentation)
- An ESGF account on the german node: https://esgf-data.dkrz.de
- Approved access to the CORDEX datasets. Apply here: https://esg-dn1.nsc.liu.se/ac/subscribe/CORDEX_Research
    (you may need to apply twice before your account is flagged approved)
- evironment variables saved for:
    - ESGF_USERNAME - the ESGF username for your german-node account
    - ESGF_PASSWORD - the ESGF password for your german-node account
    - DATA_HOME - a local path to the directory in which you want to store this data

You can save evironment variables by using either the terminal commands:

        export ESGF_USERNAME=myusername
        export ESGF_PASSWORD=mypassword
        export DATA_HOME=path/to/data
        
OR save them for all future sessions by copying the above commands into your bash profile (~/.bashrc)

In [1]:
import os
import ssl
import pyesgf
import aiohttp
import asyncio
import xarray as xr
from itertools import product
from pyesgf.logon import LogonManager
from pyesgf.search import SearchConnection

import nest_asyncio
nest_asyncio.apply()

In [2]:
# define your query
query = {
    'project': 'CORDEX',
    'domain': 'EUR-11',
    'experiment': 'rcp85',
    'variable': 'tas',
    'time_frequency': 'mon',
    'ensemble': 'r1i1p1'
}

# ensure the following are saved as environment variables
USERNAME = os.environ['ESGF_USERNAME']
PASSWORD = os.environ['ESGF_PASSWORD']
DATA_PATH = os.environ['DATA_HOME']

In [3]:
# check ESGF for number of datasets that satisfy query
conn = SearchConnection('http://esgf-data.dkrz.de/esg-search', distrib=True)
context = conn.new_context(**query, facets=query.keys())
context.hit_count

46

In [4]:
# login to ESGF and generate SSL context
myproxy_host = 'esgf-data.dkrz.de'

lm = LogonManager()
lm.logon(username=USERNAME, password=PASSWORD, hostname=myproxy_host)

sslcontext = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
sslcontext.load_verify_locations(capath=lm.esgf_certs_dir)
sslcontext.load_cert_chain(lm.esgf_credentials)

lm.is_logged_on()

True

In [5]:
# generate results and check an example dataset to verify all is working as expected
results = context.search()
example_dataset = results[0]
example_dataset.dataset_id

'cordex.output.EUR-11.CLMcom.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.CCLM4-8-17.v1.mon.tas.v20140515|esgf1.dkrz.de'

In [6]:
# and now an example file within that dataset, including its http download link
example_files = example_dataset.file_context().search(ignore_facet_check=True)
example_file = example_files[0]
example_file.download_url

'http://esgf1.dkrz.de/thredds/fileServer/cordex/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/rcp85/r1i1p1/CLMcom-CCLM4-8-17/v1/mon/tas/v20140515/tas_EUR-11_MPI-M-MPI-ESM-LR_rcp85_r1i1p1_CLMcom-CCLM4-8-17_v1_mon_200601-201012.nc'

In [201]:
# define several helpful functions
def generate_dataset_local_path(
            dataset: pyesgf.search.results.DatasetResult, 
            home_path: str
    ):
    '''
    Takes pyesgf dataset object and returns a path object for that dataset were it stored locally, format:
        $DATA_HOME/project/domain/variable/experiment/gcm/rcm
    '''
    id, data_node = dataset.dataset_id.split('|')
    project, product, domain, institute, gcm, experiment, ensemble, rcm, downscaling, frequency, variable, version = id.split('.')

    path = os.path.join(home_path, project, domain, variable, experiment, ensemble, gcm, rcm)

    return path

In [202]:
async def download_file(
            session: aiohttp.ClientSession, 
            file: pyesgf.search.results.FileResult, 
            local_directory: str
    ):
    '''
    Coroutine which takes a aiohttp client session and pyesgf file object and downloads it to a local directory
    '''
    url = file.download_url
    if 'HadREM3-GA7-05' in url:
        url.replace('v20201111', 'latest') # hack to avoid http error

    filename = file.filename
    filepath = os.path.join(local_directory, filename)

    # if file already exists, skip it
    if os.path.isfile(filepath):
        return

    # open client session
    async with session.request('get', url, ssl=sslcontext) as response:
        
        temp_filepath = filepath+'.inprogress' # temporary filename whilst downloading
        chunk_size = 2048
        with open(temp_filepath, 'wb') as local_file:
            async for chunk in response.content.iter_chunked(chunk_size):
                local_file.write(chunk)
            os.rename(temp_filepath, filepath) # remove .inprogress suffix when finished

In [203]:
async def download_multiple(
            loop: asyncio.unix_events._UnixSelectorEventLoop, 
            files: pyesgf.search.results.ResultSet, 
            local_directory: str
    ):
    '''
    Coroutine that takes ayncio loop object and pyesgf files object and asynchronously downloads them to
    a local directory. 
    '''
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download_file(session, file, local_directory) for file in files]
        await asyncio.gather(*tasks)

In [210]:
def remove_incomplete_files(directory):
    '''
    removes any files in a certain directory with the '.inprogress' suffix
    '''
    filenames = os.listdir(directory)
    incomplete = [os.path.join(directory, filename) for filename in filenames if '.inprogress' in filename]
    for file in incomplete:
        os.remove(file)
    if not os.listdir(directory): # if directory empty
        os.rmdir(directory)

In [211]:
def download_dataset(
            dataset: pyesgf.search.results.DatasetResult, 
            local_path: str,
            verbose: bool = False
    ):
    '''
    Takes pyesgf dataset object, creates local directory for dataset to be stored, extracts file objects
    then asynchronously downloads all files.
    '''
    # create all appripriate directories if they do not already exist
    directory = generate_dataset_local_path(dataset, local_path)
    if not os.path.exists(directory):
        os.makedirs(directory)

    # extract files
    files = dataset.file_context().search(ignore_facet_check=True)

    # create loop for asynchronous downloads
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(download_multiple(loop, files, directory))
        verbose and print('---> done!')
        return True
    except Exception as err:
        verbose and print('\n\tencountered an error:', repr(err))
        remove_incomplete_files(directory)
        return False

In [206]:
def download_ensemble(
            context: pyesgf.search.context.DatasetSearchContext, 
            verbose: bool = False
    ):
    ''' 
    Takes a pyesgf context object and downloads all available datasets that satisfy the constraints of
    that context. Datasets are downloaded one by one, but files within each dataset are downloaded
    asynchronously.
    '''
    # get all matching datasets for context
    results = context.search()

    # initialise outputs
    successfully_downloaded = set()
    encountered_errors = set()


    for i, dataset in enumerate(results):
        verbose and print('downloading {} of {}: {}'.format(i+1, len(results), dataset.dataset_id), end=' ')

        try:
            success = download_dataset(dataset, DATA_PATH, verbose) # True if all files downloaded successfully
            if success:
                successfully_downloaded.add(dataset.dataset_id)
            else:
                encountered_errors.add('\t'+dataset.dataset_id)
        except KeyboardInterrupt:
            directory = generate_dataset_local_path(dataset, DATA_PATH)
            remove_incomplete_files(directory)
            break

    print('\nsuccessfully downloaded {} of {} datasets'.format(len(successfully_downloaded), len(results)))
    if encountered_errors:
        print('the following datasets were not downloaded due to encountering errors during the process:')
        print(*encountered_errors, sep='\n')

    return {
        'success': successfully_downloaded,
        'errors': encountered_errors
    }

In [209]:
# call download_ensemble on the context generated by your query
downloads = download_ensemble(context, verbose=True)

downloading 1 of 46: cordex.output.EUR-11.CLMcom.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.CCLM4-8-17.v1.mon.tas.v20140515|esgf1.dkrz.de ---> done!
downloading 2 of 46: cordex.output.EUR-11.CLMcom.MOHC-HadGEM2-ES.rcp85.r1i1p1.CCLM4-8-17.v1.mon.tas.v20150320|esgf1.dkrz.de ---> done!
downloading 3 of 46: cordex.output.EUR-11.KNMI.ICHEC-EC-EARTH.rcp85.r1i1p1.RACMO22E.v1.mon.tas.v20140324|esgf1.dkrz.de ---> done!
downloading 4 of 46: cordex.output.EUR-11.CLMcom.CNRM-CERFACS-CNRM-CM5.rcp85.r1i1p1.CCLM4-8-17.v1.mon.tas.v20140515|esgf1.dkrz.de ---> done!
downloading 5 of 46: cordex.output.EUR-11.KNMI.MOHC-HadGEM2-ES.rcp85.r1i1p1.RACMO22E.v2.mon.tas.v20160705|esgf1.dkrz.de ---> done!
downloading 6 of 46: cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1.mon.tas.v20160525|esgf1.dkrz.de ---> done!
downloading 7 of 46: cordex.output.EUR-11.RMIB-UGent.CNRM-CERFACS-CNRM-CM5.rcp85.r1i1p1.ALARO-0.v1.mon.tas.v20170207|esgf1.dkrz.de ---> done!
downloading 8 of 46: cordex.output.EUR-11.UHOH.MPI

NB: If you're only downloading a handful of variables/experiments, the above code will be perfectly suitable for your needs. The following code blocks are only for circumstances where you want to leave your code running for a very long time (eg overnight) and would like to queue everything up for one big execution.

In [213]:
# or make multiple queries

queries = {
    'project': ['CORDEX'],
    'domain': ['EUR-11'],
    'experiment': ['historical', 'rcp26', 'rcp85'],
    'variable': ['tas', 'pr'],
    'time_frequency': ['mon'],
    'ensemble': ['r1i1p1']
}

def make_multiple_queries(queries: dict):
    '''
    Takes a queries dictionary with ESGF facets as keys and lists of strings as values.
    Prints a table of all configurations of query and the number of datasets that match.
    Returns a dictionary of contexts for each configuration.
    Has the structure -> dict[config] = context
    '''
    headers = queries.keys()
    values = queries.values()
    h = len(headers)
    contexts = {}

    print('querying ESGF...')
    print('found the following datasets matching your queries:\n')

    print(('{:<14} '*h).format(*headers), 'hit_count')
    for config in product(*values):
        query = {key: value for key, value in zip(headers, config)}
        context = conn.new_context(**query, facets=headers)
        contexts[config] = context
        hit_count = context.hit_count
        print(('{:<14} '*h).format(*config), hit_count)

    return contexts

# call function
contexts = make_multiple_queries(queries=queries)

querying ESGF...
found the following datasets matching your queries:

project        domain         experiment     variable       time_frequency ensemble        hit_count
CORDEX         EUR-11         historical     tas            mon            r1i1p1          48
CORDEX         EUR-11         historical     pr             mon            r1i1p1          48
CORDEX         EUR-11         rcp26          tas            mon            r1i1p1          23
CORDEX         EUR-11         rcp26          pr             mon            r1i1p1          23
CORDEX         EUR-11         rcp85          tas            mon            r1i1p1          46
CORDEX         EUR-11         rcp85          pr             mon            r1i1p1          46


In [36]:
# and then queue them to download one after the other

def download_multiple_ensembles(queries:dict, verbose:bool=False):
    '''
    Takes a queries dictionary in the same form as make_multiple_queries. Carries out said queries
    and then requests confirmation from the user to proceed. Following confirmation, continues to
    download each ensemble one by one
    '''
    contexts = make_multiple_queries(queries)
    successfully_downloaded = set()
    encountered_errors = set()

    print('querying ESGF...')
    print('found the following datasets matching your queries:\n')

    response = input('proceed? (y/n)')
    if response != 'y':
        return

    for config in contexts:

        print('\ndownloading next config:', *config)
        context = contexts[config]
        downloads = download_ensemble(context, verbose)
        for dataset in downloads:
            successfully_downloaded |= dataset['success']
            encountered_errors |= dataset['errors']

    print('\nall downloads now complete. successfully downloaded {} out of {} datasets.'.format(
                len(successfully_downloaded),
                len(successfully_downloaded)+len(encountered_errors)
    ))
    print('a full list of datasets omitted due to errors is available below:\n')
    print(*encountered_errors, sep='\n')

# pull the trigger
download_multiple_ensembles(queries=queries)

querying ESGF...
found the following datasets matching your queries:

project        domain         experiment     variable       time_frequency ensemble        hit_count
CORDEX         EUR-11         historical     tas            mon            r1i1p1          48
CORDEX         EUR-11         historical     pr             mon            r1i1p1          48
CORDEX         EUR-11         rcp26          tas            mon            r1i1p1          23
CORDEX         EUR-11         rcp26          pr             mon            r1i1p1          23
CORDEX         EUR-11         rcp85          tas            mon            r1i1p1          46
CORDEX         EUR-11         rcp85          pr             mon            r1i1p1          46

great!


In [183]:
def check_for_corrupt(path_to_data):
    '''
    Very ugly function to verify all datasets have been downloaded properly and
    check for any files that may be corrupt.
    '''

    path = os.path.join(path_to_data, 'cordex', 'EUR-11')
    variables = [var for var in os.listdir(path) if not '.DS_Store' in var]
    errors = []

    for variable in variables:
        new_path = os.path.join(path, variable)
        experiments = [ex for ex in os.listdir(new_path) if not '.DS_Store' in ex]
        for experiment in experiments:
            new2path = os.path.join(new_path, experiment, 'r1i1p1')
            gcms = [gcm for gcm in os.listdir(new2path) if not '.DS_Store' in gcm]
            for gcm in gcms:
                new3path = os.path.join(new2path, gcm)
                rcms = [rcm for rcm in os.listdir(new3path) if not '.DS_Store' in rcm]
                for rcm in rcms:
                    new4path = os.path.join(new3path, rcm)
                    filenames = os.listdir(new4path)
                    filepaths = [os.path.join(new4path, filename) for filename in filenames if '.DS_Store' not in filename]
                    try:
                        xr.open_mfdataset(filepaths)
                    except Exception:
                        errors.append(new4path)

    print(*errors, sep='\n')

check_for_corrupt(DATA_PATH)


