In [1]:
'''
Estimate the number of lethal heat hours per day in CMIP6 data using the lookup method.

Temperature max, min, and RH max and min are assumed to exist on a google bucket.
These are pulled down in the analysis. The resulting tolh files (time over lethal heat)
are pushed back to the google bucket at the end.
'''

import numpy as np
from lethal_heat import Vecellio22
import os.path as path
import xarray as xr
from dask.distributed import Client
import dask.delayed as delayed
import dask
import glob
import subprocess
from datetime import datetime

In [None]:
# Create Dask client for parallel analysis and chunking
client = Client(n_workers = 5, threads_per_worker=1)

In [2]:
def par_loop(fp_tmax, fp_tmin, fp_rmin, fp_rmean, fp_ii, fp_lookup):
    '''
    Function to apply within a parallel loop of delayed objects.
    '''
    
    # Open datasets and extract variables
    t_max = xr.open_dataset(fp_tmax)['tasmax'] - 273.15
    t_min = xr.open_dataset(fp_tmin)['tasmin'] - 273.15
    r_min = xr.open_dataset(fp_rmin)['hursmin']
    r_mean = xr.open_dataset(fp_rmean)['hurs']
    lookup = xr.open_dataset(fp_lookup)
    
    # Make output
    ds_out = xr.Dataset()
    ds_out['lat'] = t_max.lat.values
    ds_out['lon'] = t_max.lon.values
    ds_out['time'] = t_max.time.values
    
    # Now just take the chunk for this iteration
    t_max = t_max.values
    t_min = t_min.values
    t_mean = (t_max + t_min) / 2
    r_min = r_min.values
    r_mean = r_mean.values
    
    # Make output array
    n_t, n_r, n_c = t_max.shape
    output = np.zeros_like(t_max)
    v22 = Vecellio22(degree=2)
    
    # Calculate ranges in t and rh at every point.
    t_amp = np.abs( t_max - t_min ) / 2
    r_amp = np.abs( r_min - r_mean )
    
    # TOLH Indices for every point in chunk
    t_mean_ind = np.round( (t_mean - 25) / .2 + 0.00001 ).astype(int)
    r_mean_ind = np.round( (r_mean) / .2 + 0.00001 ).astype(int)
    t_amp_ind = np.round( (t_amp) /.5 + 0.00001 ).astype(int)
    r_amp_ind = np.round( (r_amp) /.5 + 0.00001 ).astype(int)
    
    # Clip indices
    t_mean_ind = np.clip(t_mean_ind, 0, 77 -1)
    r_mean_ind = np.clip(r_mean_ind, 0, 501-1)
    t_amp_ind = np.clip(t_amp_ind, 0, 61-1)
    r_amp_ind = np.clip(r_amp_ind, 0, 61-1)
    
    lookup = lookup['hours_over_lh'].values
    output = lookup[t_mean_ind, r_mean_ind, t_amp_ind, r_amp_ind]
    
    ds_out['hours_over_lh'] = (['time','lat','lon'], output)
    ds_out.to_netcdf(fp_ii)
    return


In [None]:
# List of model names to find filenames for and analyse
models = ['ACCESS-CM2', 'ACCESS-ESM1-5', 'CNRM-CM6-1-HR',
          'CNRM-CM6-1', 'CNRM-ESM2-1', 'CanESM5',
          'EC-Earth3-Veg-LR', 'FGOALS-g3', 'GFDL-CM4',
          'INM-CM4-8', 'INM-CM5-0', 'IPSL-CM6A-LR',
          'MIROC-ES2L', 'MIROC6', 'MPI-ESM1-2-HR',
          'MPI-ESM1-2-LR', 'MRI-ESM2-0']
n_models = len(models)

# Define years to analyse
year0 = 1971
year1 = 2101
yearL = np.arange(year0, year1).astype(int)

# Analysis scenario (used to find filenames from template)
scenario = 'ssp585'

# Analyze models one at a time
for model in models:
    
    try:

        dir_download = './downloads'
        dir_remote = '<remote google bucket directory>'
        dir_tmp = '<Directory for temporary files>'
        fp_out = f'<output_directory>/tolh_{model}_ssp585_{year0}_{year1-1}.nc'
        fp_lookup = './tolh_lookup.nc' # Lookup file from create_lethal_heat_lookup.ipynb

        # Filename templates for input datasets (blanks filled in for each loop)
        fp_tmax_tmp = 'tasmax_{0}_ssp585_basd_0.5deg_{1}.nc'
        fp_tmin_tmp = 'tasmin_{0}_ssp585_basd_0.5deg_{1}.nc'
        fp_rmin_tmp = 'hursmin_{0}_ssp585_derived_from_basd_data_0.5deg_{1}.nc'
        fp_rmean_tmp = 'hurs_{0}_ssp585_basd_0.5deg_{1}.nc'
        fp_ii = path.join(dir_tmp, 'tolh_{0}_ssp585_{1}.nc')

        # Get lists of downloaded filenames
        fp_tmax_list = [fp_tmax_tmp.format(model, year) for year in np.arange(year0, year1).astype(int)]
        fp_tmin_list = [fp_tmin_tmp.format(model, year) for year in np.arange(year0, year1).astype(int)]
        fp_rmin_list = [fp_rmin_tmp.format(model, year) for year in np.arange(year0, year1).astype(int)]
        fp_rmean_list = [fp_rmean_tmp.format(model, year) for year in np.arange(year0, year1).astype(int)]

        # GET TEMPERATURE MAX
        get_cmd = f'gsutil -m cp '
        varname = 'maximum_temperature'
        for mm, filename in enumerate(fp_tmax_list):
            get_cmd = get_cmd + dir_remote.format(varname) + filename + ' '
        get_cmd = get_cmd + dir_download
        subprocess.run(get_cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        # GET TEMPERATURE MIN
        get_cmd = f'gsutil -m cp '
        varname = 'minimum_temperature'
        for mm, filename in enumerate(fp_tmin_list):
            get_cmd = get_cmd + dir_remote.format(varname) + filename + ' '
        get_cmd = get_cmd + dir_download
        subprocess.run(get_cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        # GET RH MEAN
        get_cmd = f'gsutil -m cp '
        varname = 'minimum_relative_humidity'
        for mm, filename in enumerate(fp_rmin_list):
            get_cmd = get_cmd + dir_remote.format(varname) + filename + ' '
        get_cmd = get_cmd + dir_download
        subprocess.run(get_cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        # GET RH MIN
        get_cmd = f'gsutil -m cp '
        varname = 'average_relative_humidity'
        for mm, filename in enumerate(fp_rmean_list):
            get_cmd = get_cmd + dir_remote.format(varname) + filename + ' '
        get_cmd = get_cmd + dir_download
        subprocess.run(get_cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        # Open lookup file
        n_files = len(fp_tmax_list)
        ds_lookup = xr.open_dataset(fp_lookup)

        # Make delayed version of par_loop
        par_loop_del = delayed(par_loop)
        del_list = []

        # Loop over delayed list and add a delayed instance (looping over annual files)
        for ii in range(n_files):
            fp_out = fp_ii.format(model, yearL[ii])
            del_list.append( par_loop_del( path.join( dir_download, fp_tmax_list[ii]), 
                                       path.join( dir_download, fp_tmin_list[ii]) ,
                                       path.join( dir_download, fp_rmin_list[ii]), 
                                       path.join( dir_download, fp_rmean_list[ii]), 
                                       fp_out, fp_lookup)  )

        # Compute all for this model in parallel
        dask.compute(*del_list)
        
        # Move files to google bucket
        _ = subprocess.run(f'gsutil -m cp {dir_tmp}/* gs://fqqzlp/carter2/hours_over_lh/', 
                           shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        # Remove temporary files
        _ = subprocess.run(f'rm {dir_tmp}/*', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        _ = subprocess.run(f'rm {dir_download}/*', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        
    except:
        print(f'Model failed: {model}')
        _ = subprocess.run(f'rm {dir_tmp}/*', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        _ = subprocess.run(f'rm {dir_download}/*', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)