# create_cmip6_globus_batch_files.ipynb
Create Globus batch files and scripts for ESGF CMIP6 data of interest.

B. Grandey, 2022.

In [1]:
! date

Wed Jan 19 09:54:11 +08 2022


In [2]:
import datetime
import json
import pandas as pd
import pathlib
import pyesgf
from pyesgf.search import SearchConnection
import re

# Print versions of packages
for module in [json, pd, pyesgf, re]:
    try:
        print('{}.__version__ = {}'.format(module.__name__, module.__version__))
    except AttributeError:
        pass

json.__version__ = 2.0.9
pandas.__version__ = 1.3.5
pyesgf.__version__ = 0.3.0
re.__version__ = 2.2.1


## Base paths

In [3]:
# Base path in which to save batch files and scripts
out_base = pathlib.Path('cmip6_globus_batch_files/').resolve()
out_base.mkdir(exist_ok=True)  # create directory if it does not yet exist
# Directory in which to save local cache for search connection
cache_dir = pathlib.Path('cache/').resolve()
cache_dir.mkdir(exist_ok=True)

## Establish search connection

In [4]:
# Establish search connection
expire_after = datetime.timedelta(days=10)  # cache expiry
conn = SearchConnection('https://esgf-node.llnl.gov/esg-search',
                        distrib=True,
                        cache='cache/pyesgf_cache',  # enable local cache
                        expire_after=expire_after)
conn

<pyesgf.search.connection.SearchConnection at 0x7ff815c30730>

## Identify suitable sources (models) and members (ripf variants)
Do this by finding source-member pairs that fulfil the following requirements:
1. Monthly data are available for at least one of 'zostoga', 'zos', and 'tas' variables.
2. Data are available for at least one of 'ssp585', 'ssp370', 'ssp245', and 'ssp126' experiments.
3. Data are available for both 'historical' and 'piControl' experiments.
4. The member is an 'r1i1' variant (e.g. 'r1i1p1f1', 'r1i1p5f2').

In [5]:
%%time
# Create dictionary to hold available experiments (list) for each source-member pair (tuple)
source_member_experiment_dict = dict()
# Perform initial search for datasets matching first two requirements above
ctx1 = conn.new_context(project='CMIP6',
                        variable=['zostoga', 'zos', 'tas'],
                        frequency='mon',
                        experiment_id=['ssp585', 'ssp370', 'ssp245', 'ssp126'])
# Loop over available sources
sources = sorted(ctx1.facet_counts['source_id'].keys())
for source_id in sources:
    # Constrain search to source, to identify available members
    ctx2 = ctx1.constrain(source_id=source_id)
    # Find r1i1 members (requirement #4)
    members = sorted(ctx2.facet_counts['member_id'].keys())
    members = [m for m in members if bool(re.match('r1i1', m))]
    # Loop over members
    for member_id in members:
        # Search for available experiments for this source-member pair
        ctx3 = conn.new_context(project='CMIP6',
                                variable=['zostoga', 'zos', 'tas'],
                                frequency='mon',
                                source_id=source_id,
                                member_id=member_id)
        experiments = sorted(ctx3.facet_counts['experiment_id'].keys())
        # Limit to experiments of interest
        experiments = [e for e in experiments if e in ['piControl', 'historical',
                                                       'ssp585', 'ssp370', 'ssp245', 'ssp126']]
        # Are data available for both the historical and piControl experiments?
        if ('historical' in experiments) and ('piControl' in experiments):
            # Save to dictionary
            source_member_experiment_dict[(source_id, member_id)] = experiments
            # Print
            print(f'{source_id} {member_id}: {experiments}')
# Summarise number of source-member pairs identified
print(f'{len(source_member_experiment_dict)} source-member pairs identified.')

ACCESS-CM2 r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
ACCESS-ESM1-5 r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
AWI-CM-1-1-MR r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
BCC-CSM2-MR r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
BCC-ESM1 r1i1p1f1: ['historical', 'piControl', 'ssp370']
CAMS-CSM1-0 r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
CAS-ESM2-0 r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
CESM2 r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
CESM2-WACCM r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
CIESM r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp585']
CMCC-CM2-SR5 r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp245', 'ssp370', 'ssp585']
CMCC-ESM2 r1i1p1f1: ['historical', 'piControl', 'ssp126', 'ssp

## Function to find Globus URLs and write Globus batch files for a source-member pair

If one wishes to only find only one Globus URL for each unique NetCDF file, then it is faster to search for dataset results then skip datasets with an instance_id that has already been processed.
However, NetCDF files may be missed in practice: some Globus URLs may be inaccessible due to problems with endpoint accessibility, non-existent paths etc.
Therefore, it makes sense to build in some redundancy by including every Globus URL.

In light of these considerations, the function below performs a file search.

In [6]:
def write_globus_batch_files(source_id='ACCESS-CM2',
                             member_id='r1i1p1f1',
                             variables=['zostoga',],
                             frequency='mon',
                             experiments=['piControl', 'historical', 'ssp585', 'ssp370', 'ssp245', 'ssp126'],
                             conn=conn):
    """Find Globus URLs and write Globus batch files for a CMIP6 source-member pair.
    
    Keyword arguments:
      source_id -- string: ESGF source_id / model (default 'ACCESS-CM2')
      member_id -- string; ESGF member_id / ripf variant (default 'r1i1p1f1')
      variables -- list: variables of interest (default ['zostoga',])
      frequency -- string: time frequency of variable (default 'mon') 
      experiments -- list: experiment_id for experiments of interest
          (default ['piControl', 'historical', 'ssp585', 'ssp370', 'ssp245', 'ssp126'])
      conn -- pyesgf SearchConnection (default is a SearchConnection named 'conn')
    
    Returns:
      batch_fn_ep_dict -- dict: names of the batch files written (keys) with corresponding endpoint (values)
    """
    print(f'---- {source_id} {member_id} ----')
    # Create DataFrame to hold Globus info etc for search results
    globus_df = pd.DataFrame(columns=['variable', 'filename',
                                      'globus_url',  # URL (suitably unique to also use as index)
                                      'globus_ep', 'globus_path',  # Globus source endpoint and path
                                      'dest_path'])  # target path on destination endpoint
    # Loop over variables
    for v in variables:
        # File search context
        ctx1 = conn.new_context(project='CMIP6',
                                source_id=source_id,
                                member_id=member_id,
                                variable=v,
                                frequency=frequency,
                                experiment_id=experiments,
                                latest=True,
                                search_type='File')
        # Perform search and loop over file results
        file_results = ctx1.search()
        print(f'{v}: {len(file_results)} file results to process.')
        for f in file_results:
            # Is result marked as retracted? If so, then skip.
            if f.json['retracted']:
                continue
            # Does Globus URL exist?
            globus_url = f.globus_url
            if globus_url:
                # Identify endpoint
                globus_ep = globus_url.split('/')[0]
                globus_ep = globus_ep.replace('globus:', '')
                if len(globus_ep) != 36:
                    print(f'globus_ep = "{globus_ep}" looks suspect. Skipping.')
                else:
                    # Path on endpoint
                    globus_path = globus_url.split(f'{globus_ep}/')[1]
                    # Target path on local endpoint (relative to $GCP_EP_CMIP6 environment variable)
                    instance_id = f.json['dataset_id'].split('|')[0]  # dataset's instance_id
                    dest_path = f'{v}/{source_id}_{member_id}/{instance_id}/{f.filename}'
                    # Update DataFrame
                    globus_df.at[globus_url] = {'variable': v, 'filename': f.filename,
                                                'globus_url': globus_url,
                                                'globus_ep': globus_ep, 'globus_path': globus_path,
                                                'dest_path': dest_path}
        # Print number of URLs found
        print(f'{v}: {globus_df["variable"].value_counts()[v]} Globus URLs saved.')
    # Dict to hold batch filenames (keys) and source endpoints (values)
    batch_fn_ep_dict = dict()
    # Loop over source endpoints
    for globus_ep in globus_df['globus_ep'].value_counts().index:
        # Select subset of data for this endpoint
        ep_df = globus_df[globus_df['globus_ep']==globus_ep]
        # Get name of endpoint using Globus CLI
        ep_json = ! globus endpoint show -F json {globus_ep}
        ep_json = json.loads(''.join(ep_json))
        ep_name = ep_json['display_name']
        print(f'{ep_name}: {len(ep_df)} files in batch.')
        # Label for transfer
        if len(variables) == 1:
            var_str = variables[0]
        elif len(variables) == 2:
            var_str = '-'.join(variables)
        else:
            var_str = f'{len(variables)}vars-inc-{variables[0]}'
        if len(experiments) == 1:
            exp_str = experiments[0]
        else:
            exp_str = f'{len(experiments)}exps'
        batch_label = f'{source_id}_{member_id}_{frequency}_{var_str}_{exp_str}_{globus_ep}'
        # Filename of batch file to write
        batch_fn = f'{batch_label}.txt'
        # Directory in which to write batch file
        batch_dir = out_base.joinpath(globus_ep)
        batch_dir.mkdir(exist_ok=True)
        # Write batch file
        with open(batch_dir.joinpath(batch_fn), 'w') as writer:
            writer.write(f'# Written by write_globus_batch_files() in create_cmip6_globus_batch_files.ipynb '
                         + f'on {datetime.date.today()}\n')
            writer.write(f'# Globus endpoint is {globus_ep} (Name: {ep_name}).\n')
            writer.write(f'# {len(ep_df)} files in batch.\n')
            writer.write(f'# To activate source endpoint use Globus CLI:\n')
            writer.write(f'# globus endpoint activate --web {globus_ep}\n')
            writer.write(f'# To submit transfer use Globus CLI:\n')
            writer.write(f'# globus transfer {globus_ep} $GCP_EP_CMIP6 --batch {batch_fn} '
                         + f'--preserve-mtime --fail-on-quota-errors --skip-source-errors --sync-level checksum '
                         + f'--label "{batch_label}"\n')
            writer.write(f'# Replace $GCP_EP_CMIP6 with intended destination endpoint, including base path.\n')
            writer.write('\n')
            for i in ep_df.index:  # loop over rows of DataFrame
                globus_path = ep_df.loc[i]['globus_path']
                dest_path = ep_df.loc[i]['dest_path']
                writer.write(f'{globus_path} {dest_path}\n')
            print(f'Written {batch_fn} ({len(ep_df)} files).')
            batch_fn_ep_dict[batch_fn] = globus_ep
    return batch_fn_ep_dict

## TODO: Write batch files for source-member pairs identified above

In [7]:
# Create dictionary to hold all batch_fn_ep_dict results returned by write_globus_batch_files()
main_batch_fn_ep_dict = dict()

In [8]:
# TODO

## Write batch files for specific combinations of source-member pair, variables, frequency, and experiments
Specific custom combinations can be added in this section.

In [9]:
# List containing tuples of custom combinations (source_id, member_id, variables, frequency, experiments)
comb_list = [('ACCESS-CM2', 'r1i1p1f1', ['thetao',], 'mon', ['ssp585',],),  # test 3D variable
             ]
# Produce batch file for each custom combination
for comb in comb_list:
    source_id, member_id, variables, frequency, experiments = comb
    temp_dict = write_globus_batch_files(source_id=source_id,
                                         member_id=member_id,
                                         variables=variables,
                                         frequency=frequency,
                                         experiments=experiments)
    main_batch_fn_ep_dict.update(temp_dict)  # update dictionary with new filenames and endpoints

---- ACCESS-CM2 r1i1p1f1 ----
thetao: 65 file results to process.
thetao: 53 Globus URLs saved.
NCI ESGF: 29 files in batch.
Written ACCESS-CM2_r1i1p1f1_mon_thetao_ssp585_2058c7d6-a79f-11e6-9ad6-22000a1e3b52.txt (29 files).
LLNL ESGF: 12 files in batch.
Written ACCESS-CM2_r1i1p1f1_mon_thetao_ssp585_415a6320-e49c-11e5-9798-22000b9da45e.txt (12 files).
CEDA ESGF DN1: 12 files in batch.
Written ACCESS-CM2_r1i1p1f1_mon_thetao_ssp585_ee3aa1a0-7e4c-11e6-afc4-22000b92c261.txt (12 files).


In [10]:
print(main_batch_fn_ep_dict)

{'ACCESS-CM2_r1i1p1f1_mon_thetao_ssp585_2058c7d6-a79f-11e6-9ad6-22000a1e3b52.txt': '2058c7d6-a79f-11e6-9ad6-22000a1e3b52', 'ACCESS-CM2_r1i1p1f1_mon_thetao_ssp585_415a6320-e49c-11e5-9798-22000b9da45e.txt': '415a6320-e49c-11e5-9798-22000b9da45e', 'ACCESS-CM2_r1i1p1f1_mon_thetao_ssp585_ee3aa1a0-7e4c-11e6-afc4-22000b92c261.txt': 'ee3aa1a0-7e4c-11e6-afc4-22000b92c261'}


In [11]:
! date

Wed Jan 19 09:54:21 +08 2022
